async_executor/
static_executors.rs

1use crate::{debug_state, Executor, LocalExecutor, State};
2use async_task::{Builder, Runnable, Task};
3use slab::Slab;
4use std::{
5    cell::UnsafeCell,
6    fmt,
7    future::Future,
8    marker::PhantomData,
9    panic::{RefUnwindSafe, UnwindSafe},
10    sync::PoisonError,
11};
12
13impl Executor<'static> {
14    /// Consumes the [`Executor`] and intentionally leaks it.
15    ///
16    /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
17    /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations
18    /// when spawning, running, and finishing tasks.
19    ///
20    /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is
21    /// irreversible without the use of unsafe.
22    ///
23    /// # Example
24    ///
25    /// ```
26    /// use async_executor::Executor;
27    /// use futures_lite::future;
28    ///
29    /// let ex = Executor::new().leak();
30    ///
31    /// let task = ex.spawn(async {
32    ///     println!("Hello world");
33    /// });
34    ///
35    /// future::block_on(ex.run(task));
36    /// ```
37    pub fn leak(self) -> &'static StaticExecutor {
38        let ptr = self.state_ptr();
39        // SAFETY: So long as an Executor lives, it's state pointer will always be valid
40        // when accessed through state_ptr. This executor will live for the full 'static
41        // lifetime so this isn't an arbitrary lifetime extension.
42        let state: &'static State = unsafe { &*ptr };
43
44        std::mem::forget(self);
45
46        let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
47        if !active.is_empty() {
48            // Reschedule all of the active tasks.
49            for waker in active.drain() {
50                waker.wake();
51            }
52            // Overwrite to ensure that the slab is deallocated.
53            *active = Slab::new();
54        }
55
56        // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent).
57        // The lifetime is not altered: 'static -> 'static.
58        let static_executor: &'static StaticExecutor = unsafe { std::mem::transmute(state) };
59        static_executor
60    }
61}
62
63impl LocalExecutor<'static> {
64    /// Consumes the [`LocalExecutor`] and intentionally leaks it.
65    ///
66    /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
67    /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations
68    /// when spawning, running, and finishing tasks.
69    ///
70    /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is
71    /// irreversible without the use of unsafe.
72    ///
73    /// # Example
74    ///
75    /// ```
76    /// use async_executor::LocalExecutor;
77    /// use futures_lite::future;
78    ///
79    /// let ex = LocalExecutor::new().leak();
80    ///
81    /// let task = ex.spawn(async {
82    ///     println!("Hello world");
83    /// });
84    ///
85    /// future::block_on(ex.run(task));
86    /// ```
87    pub fn leak(self) -> &'static StaticLocalExecutor {
88        let ptr = self.inner.state_ptr();
89        // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
90        // when accessed through state_ptr. This executor will live for the full 'static
91        // lifetime so this isn't an arbitrary lifetime extension.
92        let state: &'static State = unsafe { &*ptr };
93
94        std::mem::forget(self);
95
96        let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
97        if !active.is_empty() {
98            // Reschedule all of the active tasks.
99            for waker in active.drain() {
100                waker.wake();
101            }
102            // Overwrite to ensure that the slab is deallocated.
103            *active = Slab::new();
104        }
105
106        // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent).
107        // The lifetime is not altered: 'static -> 'static.
108        let static_executor: &'static StaticLocalExecutor = unsafe { std::mem::transmute(state) };
109        static_executor
110    }
111}
112
113/// A static-lifetimed async [`Executor`].
114///
115/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static
116/// contexts via [`Executor::leak`].
117///
118/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
119/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
120///
121/// As this type does not implement `Drop`, losing the handle to the executor or failing
122/// to consistently drive the executor with [`StaticExecutor::tick`] or
123/// [`StaticExecutor::run`] will cause the all spawned tasks to permanently leak. Any
124/// tasks at the time will not be cancelled.
125///
126/// [`static`]: https://doc.rust-lang.org/std/keyword.static.html
127#[repr(transparent)]
128pub struct StaticExecutor {
129    state: State,
130}
131
132// SAFETY: Executor stores no thread local state that can be accessed via other thread.
133unsafe impl Send for StaticExecutor {}
134// SAFETY: Executor internally synchronizes all of it's operations internally.
135unsafe impl Sync for StaticExecutor {}
136
137impl UnwindSafe for StaticExecutor {}
138impl RefUnwindSafe for StaticExecutor {}
139
140impl fmt::Debug for StaticExecutor {
141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142        debug_state(&self.state, "StaticExecutor", f)
143    }
144}
145
146impl StaticExecutor {
147    /// Creates a new StaticExecutor.
148    ///
149    /// # Examples
150    ///
151    /// ```
152    /// use async_executor::StaticExecutor;
153    ///
154    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
155    /// ```
156    pub const fn new() -> Self {
157        Self {
158            state: State::new(),
159        }
160    }
161
162    /// Spawns a task onto the executor.
163    ///
164    /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static`
165    /// borrow on the executor.
166    ///
167    /// # Examples
168    ///
169    /// ```
170    /// use async_executor::StaticExecutor;
171    ///
172    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
173    ///
174    /// let task = EXECUTOR.spawn(async {
175    ///     println!("Hello world");
176    /// });
177    /// ```
178    pub fn spawn<T: Send + 'static>(
179        &'static self,
180        future: impl Future<Output = T> + Send + 'static,
181    ) -> Task<T> {
182        let (runnable, task) = Builder::new()
183            .propagate_panic(true)
184            .spawn(|()| future, self.schedule());
185        runnable.schedule();
186        task
187    }
188
189    /// Spawns a non-`'static` task onto the executor.
190    ///
191    /// ## Safety
192    ///
193    /// The caller must ensure that the returned task terminates
194    /// or is cancelled before the end of 'a.
195    pub unsafe fn spawn_scoped<'a, T: Send + 'a>(
196        &'static self,
197        future: impl Future<Output = T> + Send + 'a,
198    ) -> Task<T> {
199        // SAFETY:
200        //
201        // - `future` is `Send`
202        // - `future` is not `'static`, but the caller guarantees that the
203        //    task, and thus its `Runnable` must not live longer than `'a`.
204        // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
205        //    Therefore we do not need to worry about what is done with the
206        //    `Waker`.
207        let (runnable, task) = unsafe {
208            Builder::new()
209                .propagate_panic(true)
210                .spawn_unchecked(|()| future, self.schedule())
211        };
212        runnable.schedule();
213        task
214    }
215
216    /// Attempts to run a task if at least one is scheduled.
217    ///
218    /// Running a scheduled task means simply polling its future once.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use async_executor::StaticExecutor;
224    ///
225    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
226    ///
227    /// assert!(!EXECUTOR.try_tick()); // no tasks to run
228    ///
229    /// let task = EXECUTOR.spawn(async {
230    ///     println!("Hello world");
231    /// });
232    ///
233    /// assert!(EXECUTOR.try_tick()); // a task was found
234    /// ```
235    pub fn try_tick(&self) -> bool {
236        self.state.try_tick()
237    }
238
239    /// Runs a single task.
240    ///
241    /// Running a task means simply polling its future once.
242    ///
243    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
244    ///
245    /// # Examples
246    ///
247    /// ```
248    /// use async_executor::StaticExecutor;
249    /// use futures_lite::future;
250    ///
251    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
252    ///
253    /// let task = EXECUTOR.spawn(async {
254    ///     println!("Hello world");
255    /// });
256    ///
257    /// future::block_on(EXECUTOR.tick()); // runs the task
258    /// ```
259    pub async fn tick(&self) {
260        self.state.tick().await;
261    }
262
263    /// Runs the executor until the given future completes.
264    ///
265    /// # Examples
266    ///
267    /// ```
268    /// use async_executor::StaticExecutor;
269    /// use futures_lite::future;
270    ///
271    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
272    ///
273    /// let task = EXECUTOR.spawn(async { 1 + 2 });
274    /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 }));
275    ///
276    /// assert_eq!(res, 6);
277    /// ```
278    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
279        self.state.run(future).await
280    }
281
282    /// Returns a function that schedules a runnable task when it gets woken up.
283    fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
284        let state: &'static State = &self.state;
285        // TODO: If possible, push into the current local queue and notify the ticker.
286        move |runnable| {
287            let result = state.queue.push(runnable);
288            debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
289            state.notify();
290        }
291    }
292}
293
294impl Default for StaticExecutor {
295    fn default() -> Self {
296        Self::new()
297    }
298}
299
300/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`].
301///
302/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static
303/// contexts via [`LocalExecutor::leak`].
304///
305/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
306/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
307///
308/// As this type does not implement `Drop`, losing the handle to the executor or failing
309/// to consistently drive the executor with [`StaticLocalExecutor::tick`] or
310/// [`StaticLocalExecutor::run`] will cause the all spawned tasks to permanently leak. Any
311/// tasks at the time will not be cancelled.
312///
313/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html
314#[repr(transparent)]
315pub struct StaticLocalExecutor {
316    state: State,
317    marker_: PhantomData<UnsafeCell<()>>,
318}
319
320impl UnwindSafe for StaticLocalExecutor {}
321impl RefUnwindSafe for StaticLocalExecutor {}
322
323impl fmt::Debug for StaticLocalExecutor {
324    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325        debug_state(&self.state, "StaticLocalExecutor", f)
326    }
327}
328
329impl StaticLocalExecutor {
330    /// Creates a new StaticLocalExecutor.
331    ///
332    /// # Examples
333    ///
334    /// ```
335    /// use async_executor::StaticLocalExecutor;
336    ///
337    /// thread_local! {
338    ///     static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new();
339    /// }
340    /// ```
341    pub const fn new() -> Self {
342        Self {
343            state: State::new(),
344            marker_: PhantomData,
345        }
346    }
347
348    /// Spawns a task onto the executor.
349    ///
350    /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static`
351    /// borrow on the executor.
352    ///
353    /// # Examples
354    ///
355    /// ```
356    /// use async_executor::LocalExecutor;
357    ///
358    /// let ex = LocalExecutor::new().leak();
359    ///
360    /// let task = ex.spawn(async {
361    ///     println!("Hello world");
362    /// });
363    /// ```
364    pub fn spawn<T: 'static>(&'static self, future: impl Future<Output = T> + 'static) -> Task<T> {
365        let (runnable, task) = Builder::new()
366            .propagate_panic(true)
367            .spawn_local(|()| future, self.schedule());
368        runnable.schedule();
369        task
370    }
371
372    /// Spawns a non-`'static` task onto the executor.
373    ///
374    /// ## Safety
375    ///
376    /// The caller must ensure that the returned task terminates
377    /// or is cancelled before the end of 'a.
378    pub unsafe fn spawn_scoped<'a, T: 'a>(
379        &'static self,
380        future: impl Future<Output = T> + 'a,
381    ) -> Task<T> {
382        // SAFETY:
383        //
384        // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`,
385        //   `try_tick`, `tick` and `run` can only be called from the origin
386        //    thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only
387        //    be called from the origin thread, ensuring that `future` and the executor
388        //    share the same origin thread. The `Runnable` can be scheduled from other
389        //    threads, but because of the above `Runnable` can only be called or
390        //    dropped on the origin thread.
391        // - `future` is not `'static`, but the caller guarantees that the
392        //    task, and thus its `Runnable` must not live longer than `'a`.
393        // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
394        //    Therefore we do not need to worry about what is done with the
395        //    `Waker`.
396        let (runnable, task) = unsafe {
397            Builder::new()
398                .propagate_panic(true)
399                .spawn_unchecked(|()| future, self.schedule())
400        };
401        runnable.schedule();
402        task
403    }
404
405    /// Attempts to run a task if at least one is scheduled.
406    ///
407    /// Running a scheduled task means simply polling its future once.
408    ///
409    /// # Examples
410    ///
411    /// ```
412    /// use async_executor::LocalExecutor;
413    ///
414    /// let ex = LocalExecutor::new().leak();
415    /// assert!(!ex.try_tick()); // no tasks to run
416    ///
417    /// let task = ex.spawn(async {
418    ///     println!("Hello world");
419    /// });
420    /// assert!(ex.try_tick()); // a task was found
421    /// ```
422    pub fn try_tick(&self) -> bool {
423        self.state.try_tick()
424    }
425
426    /// Runs a single task.
427    ///
428    /// Running a task means simply polling its future once.
429    ///
430    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
431    ///
432    /// # Examples
433    ///
434    /// ```
435    /// use async_executor::LocalExecutor;
436    /// use futures_lite::future;
437    ///
438    /// let ex = LocalExecutor::new().leak();
439    ///
440    /// let task = ex.spawn(async {
441    ///     println!("Hello world");
442    /// });
443    /// future::block_on(ex.tick()); // runs the task
444    /// ```
445    pub async fn tick(&self) {
446        self.state.tick().await;
447    }
448
449    /// Runs the executor until the given future completes.
450    ///
451    /// # Examples
452    ///
453    /// ```
454    /// use async_executor::LocalExecutor;
455    /// use futures_lite::future;
456    ///
457    /// let ex = LocalExecutor::new().leak();
458    ///
459    /// let task = ex.spawn(async { 1 + 2 });
460    /// let res = future::block_on(ex.run(async { task.await * 2 }));
461    ///
462    /// assert_eq!(res, 6);
463    /// ```
464    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
465        self.state.run(future).await
466    }
467
468    /// Returns a function that schedules a runnable task when it gets woken up.
469    fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
470        let state: &'static State = &self.state;
471        // TODO: If possible, push into the current local queue and notify the ticker.
472        move |runnable| {
473            let result = state.queue.push(runnable);
474            debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
475            state.notify();
476        }
477    }
478}
479
480impl Default for StaticLocalExecutor {
481    fn default() -> Self {
482        Self::new()
483    }
484}