Skip to main content

async_executor/
static_executors.rs

1use crate::{debug_state, Executor, LocalExecutor, State};
2use alloc::boxed::Box;
3use async_task::{Builder, Runnable, Task};
4use core::{
5    cell::UnsafeCell,
6    fmt,
7    future::Future,
8    marker::PhantomData,
9    panic::{RefUnwindSafe, UnwindSafe},
10    sync::atomic::Ordering,
11};
12use slab::Slab;
13use std::sync::PoisonError;
14
15impl Executor<'static> {
16    /// Consumes the [`Executor`] and intentionally leaks it.
17    ///
18    /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
19    /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations
20    /// when spawning, running, and finishing tasks.
21    ///
22    /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is
23    /// irreversible without the use of unsafe.
24    ///
25    /// # Example
26    ///
27    /// ```
28    /// use async_executor::Executor;
29    /// use futures_lite::future;
30    ///
31    /// let ex = Executor::new().leak();
32    ///
33    /// let task = ex.spawn(async {
34    ///     println!("Hello world");
35    /// });
36    ///
37    /// future::block_on(ex.run(task));
38    /// ```
39    pub fn leak(self) -> &'static StaticExecutor {
40        let ptr = self.state.load(Ordering::Relaxed);
41
42        let state: &'static State = if ptr.is_null() {
43            Box::leak(Box::new(State::new()))
44        } else {
45            // SAFETY: So long as an Executor lives, it's state pointer will always be valid
46            // when accessed through state_ptr. This executor will live for the full 'static
47            // lifetime so this isn't an arbitrary lifetime extension.
48            unsafe { &*ptr }
49        };
50
51        core::mem::forget(self);
52
53        let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
54        if !active.is_empty() {
55            // Reschedule all of the active tasks.
56            for waker in active.drain() {
57                waker.wake();
58            }
59            // Overwrite to ensure that the slab is deallocated.
60            *active = Slab::new();
61        }
62
63        // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent).
64        // The lifetime is not altered: 'static -> 'static.
65        let static_executor: &'static StaticExecutor = unsafe { core::mem::transmute(state) };
66        static_executor
67    }
68}
69
70impl LocalExecutor<'static> {
71    /// Consumes the [`LocalExecutor`] and intentionally leaks it.
72    ///
73    /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
74    /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations
75    /// when spawning, running, and finishing tasks.
76    ///
77    /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is
78    /// irreversible without the use of unsafe.
79    ///
80    /// # Example
81    ///
82    /// ```
83    /// use async_executor::LocalExecutor;
84    /// use futures_lite::future;
85    ///
86    /// let ex = LocalExecutor::new().leak();
87    ///
88    /// let task = ex.spawn(async {
89    ///     println!("Hello world");
90    /// });
91    ///
92    /// future::block_on(ex.run(task));
93    /// ```
94    pub fn leak(self) -> &'static StaticLocalExecutor {
95        let ptr = self.inner.state.load(Ordering::Relaxed);
96
97        let state: &'static State = if ptr.is_null() {
98            Box::leak(Box::new(State::new()))
99        } else {
100            // SAFETY: So long as an Executor lives, it's state pointer will always be valid
101            // when accessed through state_ptr. This executor will live for the full 'static
102            // lifetime so this isn't an arbitrary lifetime extension.
103            unsafe { &*ptr }
104        };
105
106        core::mem::forget(self);
107
108        let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
109        if !active.is_empty() {
110            // Reschedule all of the active tasks.
111            for waker in active.drain() {
112                waker.wake();
113            }
114            // Overwrite to ensure that the slab is deallocated.
115            *active = Slab::new();
116        }
117
118        // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent).
119        // The lifetime is not altered: 'static -> 'static.
120        let static_executor: &'static StaticLocalExecutor = unsafe { core::mem::transmute(state) };
121        static_executor
122    }
123}
124
125/// A static-lifetimed async [`Executor`].
126///
127/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static
128/// contexts via [`Executor::leak`].
129///
130/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
131/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
132///
133/// As this type does not implement `Drop`, losing the handle to the executor or failing
134/// to consistently drive the executor with [`StaticExecutor::tick`] or
135/// [`StaticExecutor::run`] will cause the all spawned tasks to permanently leak. Any
136/// tasks at the time will not be cancelled.
137///
138/// [`static`]: https://doc.rust-lang.org/core/keyword.static.html
139#[repr(transparent)]
140pub struct StaticExecutor {
141    state: State,
142}
143
144// SAFETY: Executor stores no thread local state that can be accessed via other thread.
145unsafe impl Send for StaticExecutor {}
146// SAFETY: Executor internally synchronizes all of it's operations internally.
147unsafe impl Sync for StaticExecutor {}
148
149impl UnwindSafe for StaticExecutor {}
150impl RefUnwindSafe for StaticExecutor {}
151
152impl fmt::Debug for StaticExecutor {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        debug_state(&self.state, "StaticExecutor", f)
155    }
156}
157
158impl StaticExecutor {
159    /// Creates a new StaticExecutor.
160    ///
161    /// # Examples
162    ///
163    /// ```
164    /// use async_executor::StaticExecutor;
165    ///
166    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
167    /// ```
168    pub const fn new() -> Self {
169        Self {
170            state: State::new(),
171        }
172    }
173
174    /// Spawns a task onto the executor.
175    ///
176    /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static`
177    /// borrow on the executor.
178    ///
179    /// # Examples
180    ///
181    /// ```
182    /// use async_executor::StaticExecutor;
183    ///
184    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
185    ///
186    /// let task = EXECUTOR.spawn(async {
187    ///     println!("Hello world");
188    /// });
189    /// ```
190    pub fn spawn<T: Send + 'static>(
191        &'static self,
192        future: impl Future<Output = T> + Send + 'static,
193    ) -> Task<T> {
194        let (runnable, task) = Builder::new()
195            .propagate_panic(true)
196            .spawn(|()| future, self.schedule());
197        runnable.schedule();
198        task
199    }
200
201    /// Spawns a non-`'static` task onto the executor.
202    ///
203    /// ## Safety
204    ///
205    /// The caller must ensure that the returned task terminates
206    /// or is cancelled before the end of 'a.
207    pub unsafe fn spawn_scoped<'a, T: Send + 'a>(
208        &'static self,
209        future: impl Future<Output = T> + Send + 'a,
210    ) -> Task<T> {
211        // SAFETY:
212        //
213        // - `future` is `Send`
214        // - `future` is not `'static`, but the caller guarantees that the
215        //    task, and thus its `Runnable` must not live longer than `'a`.
216        // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
217        //    Therefore we do not need to worry about what is done with the
218        //    `Waker`.
219        let (runnable, task) = unsafe {
220            Builder::new()
221                .propagate_panic(true)
222                .spawn_unchecked(|()| future, self.schedule())
223        };
224        runnable.schedule();
225        task
226    }
227
228    /// Attempts to run a task if at least one is scheduled.
229    ///
230    /// Running a scheduled task means simply polling its future once.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// use async_executor::StaticExecutor;
236    ///
237    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
238    ///
239    /// assert!(!EXECUTOR.try_tick()); // no tasks to run
240    ///
241    /// let task = EXECUTOR.spawn(async {
242    ///     println!("Hello world");
243    /// });
244    ///
245    /// assert!(EXECUTOR.try_tick()); // a task was found
246    /// ```
247    pub fn try_tick(&self) -> bool {
248        self.state.try_tick()
249    }
250
251    /// Runs a single task.
252    ///
253    /// Running a task means simply polling its future once.
254    ///
255    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
256    ///
257    /// # Examples
258    ///
259    /// ```
260    /// use async_executor::StaticExecutor;
261    /// use futures_lite::future;
262    ///
263    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
264    ///
265    /// let task = EXECUTOR.spawn(async {
266    ///     println!("Hello world");
267    /// });
268    ///
269    /// future::block_on(EXECUTOR.tick()); // runs the task
270    /// ```
271    pub async fn tick(&self) {
272        self.state.tick().await;
273    }
274
275    /// Runs the executor until the given future completes.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// use async_executor::StaticExecutor;
281    /// use futures_lite::future;
282    ///
283    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
284    ///
285    /// let task = EXECUTOR.spawn(async { 1 + 2 });
286    /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 }));
287    ///
288    /// assert_eq!(res, 6);
289    /// ```
290    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
291        self.state.run(future).await
292    }
293
294    /// Returns a function that schedules a runnable task when it gets woken up.
295    fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
296        let state: &'static State = &self.state;
297        // TODO: If possible, push into the current local queue and notify the ticker.
298        move |runnable| {
299            let result = state.queue.push(runnable);
300            debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
301            state.notify();
302        }
303    }
304}
305
306impl Default for StaticExecutor {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`].
313///
314/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static
315/// contexts via [`LocalExecutor::leak`].
316///
317/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
318/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
319///
320/// As this type does not implement `Drop`, losing the handle to the executor or failing
321/// to consistently drive the executor with [`StaticLocalExecutor::tick`] or
322/// [`StaticLocalExecutor::run`] will cause the all spawned tasks to permanently leak. Any
323/// tasks at the time will not be cancelled.
324///
325/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html
326#[repr(transparent)]
327pub struct StaticLocalExecutor {
328    state: State,
329    marker_: PhantomData<UnsafeCell<()>>,
330}
331
332impl UnwindSafe for StaticLocalExecutor {}
333impl RefUnwindSafe for StaticLocalExecutor {}
334
335impl fmt::Debug for StaticLocalExecutor {
336    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
337        debug_state(&self.state, "StaticLocalExecutor", f)
338    }
339}
340
341impl StaticLocalExecutor {
342    /// Creates a new StaticLocalExecutor.
343    ///
344    /// # Examples
345    ///
346    /// ```
347    /// use async_executor::StaticLocalExecutor;
348    ///
349    /// thread_local! {
350    ///     static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new();
351    /// }
352    /// ```
353    pub const fn new() -> Self {
354        Self {
355            state: State::new(),
356            marker_: PhantomData,
357        }
358    }
359
360    /// Spawns a task onto the executor.
361    ///
362    /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static`
363    /// borrow on the executor.
364    ///
365    /// # Examples
366    ///
367    /// ```
368    /// use async_executor::LocalExecutor;
369    ///
370    /// let ex = LocalExecutor::new().leak();
371    ///
372    /// let task = ex.spawn(async {
373    ///     println!("Hello world");
374    /// });
375    /// ```
376    pub fn spawn<T: 'static>(&'static self, future: impl Future<Output = T> + 'static) -> Task<T> {
377        let (runnable, task) = Builder::new()
378            .propagate_panic(true)
379            .spawn_local(|()| future, self.schedule());
380        runnable.schedule();
381        task
382    }
383
384    /// Spawns a non-`'static` task onto the executor.
385    ///
386    /// ## Safety
387    ///
388    /// The caller must ensure that the returned task terminates
389    /// or is cancelled before the end of 'a.
390    pub unsafe fn spawn_scoped<'a, T: 'a>(
391        &'static self,
392        future: impl Future<Output = T> + 'a,
393    ) -> Task<T> {
394        // SAFETY:
395        //
396        // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`,
397        //   `try_tick`, `tick` and `run` can only be called from the origin
398        //    thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only
399        //    be called from the origin thread, ensuring that `future` and the executor
400        //    share the same origin thread. The `Runnable` can be scheduled from other
401        //    threads, but because of the above `Runnable` can only be called or
402        //    dropped on the origin thread.
403        // - `future` is not `'static`, but the caller guarantees that the
404        //    task, and thus its `Runnable` must not live longer than `'a`.
405        // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
406        //    Therefore we do not need to worry about what is done with the
407        //    `Waker`.
408        let (runnable, task) = unsafe {
409            Builder::new()
410                .propagate_panic(true)
411                .spawn_unchecked(|()| future, self.schedule())
412        };
413        runnable.schedule();
414        task
415    }
416
417    /// Attempts to run a task if at least one is scheduled.
418    ///
419    /// Running a scheduled task means simply polling its future once.
420    ///
421    /// # Examples
422    ///
423    /// ```
424    /// use async_executor::LocalExecutor;
425    ///
426    /// let ex = LocalExecutor::new().leak();
427    /// assert!(!ex.try_tick()); // no tasks to run
428    ///
429    /// let task = ex.spawn(async {
430    ///     println!("Hello world");
431    /// });
432    /// assert!(ex.try_tick()); // a task was found
433    /// ```
434    pub fn try_tick(&self) -> bool {
435        self.state.try_tick()
436    }
437
438    /// Runs a single task.
439    ///
440    /// Running a task means simply polling its future once.
441    ///
442    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
443    ///
444    /// # Examples
445    ///
446    /// ```
447    /// use async_executor::LocalExecutor;
448    /// use futures_lite::future;
449    ///
450    /// let ex = LocalExecutor::new().leak();
451    ///
452    /// let task = ex.spawn(async {
453    ///     println!("Hello world");
454    /// });
455    /// future::block_on(ex.tick()); // runs the task
456    /// ```
457    pub async fn tick(&self) {
458        self.state.tick().await;
459    }
460
461    /// Runs the executor until the given future completes.
462    ///
463    /// # Examples
464    ///
465    /// ```
466    /// use async_executor::LocalExecutor;
467    /// use futures_lite::future;
468    ///
469    /// let ex = LocalExecutor::new().leak();
470    ///
471    /// let task = ex.spawn(async { 1 + 2 });
472    /// let res = future::block_on(ex.run(async { task.await * 2 }));
473    ///
474    /// assert_eq!(res, 6);
475    /// ```
476    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
477        self.state.run(future).await
478    }
479
480    /// Returns a function that schedules a runnable task when it gets woken up.
481    fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
482        let state: &'static State = &self.state;
483        // TODO: If possible, push into the current local queue and notify the ticker.
484        move |runnable| {
485            let result = state.queue.push(runnable);
486            debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
487            state.notify();
488        }
489    }
490}
491
492impl Default for StaticLocalExecutor {
493    fn default() -> Self {
494        Self::new()
495    }
496}