async_executor/static_executors.rs
1use crate::{debug_state, Executor, LocalExecutor, State};
2use alloc::boxed::Box;
3use async_task::{Builder, Runnable, Task};
4use core::{
5 cell::UnsafeCell,
6 fmt,
7 future::Future,
8 marker::PhantomData,
9 panic::{RefUnwindSafe, UnwindSafe},
10 sync::atomic::Ordering,
11};
12use slab::Slab;
13use std::sync::PoisonError;
14
15impl Executor<'static> {
16 /// Consumes the [`Executor`] and intentionally leaks it.
17 ///
18 /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
19 /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations
20 /// when spawning, running, and finishing tasks.
21 ///
22 /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is
23 /// irreversible without the use of unsafe.
24 ///
25 /// # Example
26 ///
27 /// ```
28 /// use async_executor::Executor;
29 /// use futures_lite::future;
30 ///
31 /// let ex = Executor::new().leak();
32 ///
33 /// let task = ex.spawn(async {
34 /// println!("Hello world");
35 /// });
36 ///
37 /// future::block_on(ex.run(task));
38 /// ```
39 pub fn leak(self) -> &'static StaticExecutor {
40 let ptr = self.state.load(Ordering::Relaxed);
41
42 let state: &'static State = if ptr.is_null() {
43 Box::leak(Box::new(State::new()))
44 } else {
45 // SAFETY: So long as an Executor lives, it's state pointer will always be valid
46 // when accessed through state_ptr. This executor will live for the full 'static
47 // lifetime so this isn't an arbitrary lifetime extension.
48 unsafe { &*ptr }
49 };
50
51 core::mem::forget(self);
52
53 let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
54 if !active.is_empty() {
55 // Reschedule all of the active tasks.
56 for waker in active.drain() {
57 waker.wake();
58 }
59 // Overwrite to ensure that the slab is deallocated.
60 *active = Slab::new();
61 }
62
63 // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent).
64 // The lifetime is not altered: 'static -> 'static.
65 let static_executor: &'static StaticExecutor = unsafe { core::mem::transmute(state) };
66 static_executor
67 }
68}
69
70impl LocalExecutor<'static> {
71 /// Consumes the [`LocalExecutor`] and intentionally leaks it.
72 ///
73 /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
74 /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations
75 /// when spawning, running, and finishing tasks.
76 ///
77 /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is
78 /// irreversible without the use of unsafe.
79 ///
80 /// # Example
81 ///
82 /// ```
83 /// use async_executor::LocalExecutor;
84 /// use futures_lite::future;
85 ///
86 /// let ex = LocalExecutor::new().leak();
87 ///
88 /// let task = ex.spawn(async {
89 /// println!("Hello world");
90 /// });
91 ///
92 /// future::block_on(ex.run(task));
93 /// ```
94 pub fn leak(self) -> &'static StaticLocalExecutor {
95 let ptr = self.inner.state.load(Ordering::Relaxed);
96
97 let state: &'static State = if ptr.is_null() {
98 Box::leak(Box::new(State::new()))
99 } else {
100 // SAFETY: So long as an Executor lives, it's state pointer will always be valid
101 // when accessed through state_ptr. This executor will live for the full 'static
102 // lifetime so this isn't an arbitrary lifetime extension.
103 unsafe { &*ptr }
104 };
105
106 core::mem::forget(self);
107
108 let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
109 if !active.is_empty() {
110 // Reschedule all of the active tasks.
111 for waker in active.drain() {
112 waker.wake();
113 }
114 // Overwrite to ensure that the slab is deallocated.
115 *active = Slab::new();
116 }
117
118 // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent).
119 // The lifetime is not altered: 'static -> 'static.
120 let static_executor: &'static StaticLocalExecutor = unsafe { core::mem::transmute(state) };
121 static_executor
122 }
123}
124
125/// A static-lifetimed async [`Executor`].
126///
127/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static
128/// contexts via [`Executor::leak`].
129///
130/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
131/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
132///
133/// As this type does not implement `Drop`, losing the handle to the executor or failing
134/// to consistently drive the executor with [`StaticExecutor::tick`] or
135/// [`StaticExecutor::run`] will cause the all spawned tasks to permanently leak. Any
136/// tasks at the time will not be cancelled.
137///
138/// [`static`]: https://doc.rust-lang.org/core/keyword.static.html
139#[repr(transparent)]
140pub struct StaticExecutor {
141 state: State,
142}
143
144// SAFETY: Executor stores no thread local state that can be accessed via other thread.
145unsafe impl Send for StaticExecutor {}
146// SAFETY: Executor internally synchronizes all of it's operations internally.
147unsafe impl Sync for StaticExecutor {}
148
149impl UnwindSafe for StaticExecutor {}
150impl RefUnwindSafe for StaticExecutor {}
151
152impl fmt::Debug for StaticExecutor {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 debug_state(&self.state, "StaticExecutor", f)
155 }
156}
157
158impl StaticExecutor {
159 /// Creates a new StaticExecutor.
160 ///
161 /// # Examples
162 ///
163 /// ```
164 /// use async_executor::StaticExecutor;
165 ///
166 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
167 /// ```
168 pub const fn new() -> Self {
169 Self {
170 state: State::new(),
171 }
172 }
173
174 /// Spawns a task onto the executor.
175 ///
176 /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static`
177 /// borrow on the executor.
178 ///
179 /// # Examples
180 ///
181 /// ```
182 /// use async_executor::StaticExecutor;
183 ///
184 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
185 ///
186 /// let task = EXECUTOR.spawn(async {
187 /// println!("Hello world");
188 /// });
189 /// ```
190 pub fn spawn<T: Send + 'static>(
191 &'static self,
192 future: impl Future<Output = T> + Send + 'static,
193 ) -> Task<T> {
194 let (runnable, task) = Builder::new()
195 .propagate_panic(true)
196 .spawn(|()| future, self.schedule());
197 runnable.schedule();
198 task
199 }
200
201 /// Spawns a non-`'static` task onto the executor.
202 ///
203 /// ## Safety
204 ///
205 /// The caller must ensure that the returned task terminates
206 /// or is cancelled before the end of 'a.
207 pub unsafe fn spawn_scoped<'a, T: Send + 'a>(
208 &'static self,
209 future: impl Future<Output = T> + Send + 'a,
210 ) -> Task<T> {
211 // SAFETY:
212 //
213 // - `future` is `Send`
214 // - `future` is not `'static`, but the caller guarantees that the
215 // task, and thus its `Runnable` must not live longer than `'a`.
216 // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
217 // Therefore we do not need to worry about what is done with the
218 // `Waker`.
219 let (runnable, task) = unsafe {
220 Builder::new()
221 .propagate_panic(true)
222 .spawn_unchecked(|()| future, self.schedule())
223 };
224 runnable.schedule();
225 task
226 }
227
228 /// Attempts to run a task if at least one is scheduled.
229 ///
230 /// Running a scheduled task means simply polling its future once.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use async_executor::StaticExecutor;
236 ///
237 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
238 ///
239 /// assert!(!EXECUTOR.try_tick()); // no tasks to run
240 ///
241 /// let task = EXECUTOR.spawn(async {
242 /// println!("Hello world");
243 /// });
244 ///
245 /// assert!(EXECUTOR.try_tick()); // a task was found
246 /// ```
247 pub fn try_tick(&self) -> bool {
248 self.state.try_tick()
249 }
250
251 /// Runs a single task.
252 ///
253 /// Running a task means simply polling its future once.
254 ///
255 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
256 ///
257 /// # Examples
258 ///
259 /// ```
260 /// use async_executor::StaticExecutor;
261 /// use futures_lite::future;
262 ///
263 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
264 ///
265 /// let task = EXECUTOR.spawn(async {
266 /// println!("Hello world");
267 /// });
268 ///
269 /// future::block_on(EXECUTOR.tick()); // runs the task
270 /// ```
271 pub async fn tick(&self) {
272 self.state.tick().await;
273 }
274
275 /// Runs the executor until the given future completes.
276 ///
277 /// # Examples
278 ///
279 /// ```
280 /// use async_executor::StaticExecutor;
281 /// use futures_lite::future;
282 ///
283 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
284 ///
285 /// let task = EXECUTOR.spawn(async { 1 + 2 });
286 /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 }));
287 ///
288 /// assert_eq!(res, 6);
289 /// ```
290 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
291 self.state.run(future).await
292 }
293
294 /// Returns a function that schedules a runnable task when it gets woken up.
295 fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
296 let state: &'static State = &self.state;
297 // TODO: If possible, push into the current local queue and notify the ticker.
298 move |runnable| {
299 let result = state.queue.push(runnable);
300 debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
301 state.notify();
302 }
303 }
304}
305
306impl Default for StaticExecutor {
307 fn default() -> Self {
308 Self::new()
309 }
310}
311
312/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`].
313///
314/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static
315/// contexts via [`LocalExecutor::leak`].
316///
317/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
318/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
319///
320/// As this type does not implement `Drop`, losing the handle to the executor or failing
321/// to consistently drive the executor with [`StaticLocalExecutor::tick`] or
322/// [`StaticLocalExecutor::run`] will cause the all spawned tasks to permanently leak. Any
323/// tasks at the time will not be cancelled.
324///
325/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html
326#[repr(transparent)]
327pub struct StaticLocalExecutor {
328 state: State,
329 marker_: PhantomData<UnsafeCell<()>>,
330}
331
332impl UnwindSafe for StaticLocalExecutor {}
333impl RefUnwindSafe for StaticLocalExecutor {}
334
335impl fmt::Debug for StaticLocalExecutor {
336 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
337 debug_state(&self.state, "StaticLocalExecutor", f)
338 }
339}
340
341impl StaticLocalExecutor {
342 /// Creates a new StaticLocalExecutor.
343 ///
344 /// # Examples
345 ///
346 /// ```
347 /// use async_executor::StaticLocalExecutor;
348 ///
349 /// thread_local! {
350 /// static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new();
351 /// }
352 /// ```
353 pub const fn new() -> Self {
354 Self {
355 state: State::new(),
356 marker_: PhantomData,
357 }
358 }
359
360 /// Spawns a task onto the executor.
361 ///
362 /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static`
363 /// borrow on the executor.
364 ///
365 /// # Examples
366 ///
367 /// ```
368 /// use async_executor::LocalExecutor;
369 ///
370 /// let ex = LocalExecutor::new().leak();
371 ///
372 /// let task = ex.spawn(async {
373 /// println!("Hello world");
374 /// });
375 /// ```
376 pub fn spawn<T: 'static>(&'static self, future: impl Future<Output = T> + 'static) -> Task<T> {
377 let (runnable, task) = Builder::new()
378 .propagate_panic(true)
379 .spawn_local(|()| future, self.schedule());
380 runnable.schedule();
381 task
382 }
383
384 /// Spawns a non-`'static` task onto the executor.
385 ///
386 /// ## Safety
387 ///
388 /// The caller must ensure that the returned task terminates
389 /// or is cancelled before the end of 'a.
390 pub unsafe fn spawn_scoped<'a, T: 'a>(
391 &'static self,
392 future: impl Future<Output = T> + 'a,
393 ) -> Task<T> {
394 // SAFETY:
395 //
396 // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`,
397 // `try_tick`, `tick` and `run` can only be called from the origin
398 // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only
399 // be called from the origin thread, ensuring that `future` and the executor
400 // share the same origin thread. The `Runnable` can be scheduled from other
401 // threads, but because of the above `Runnable` can only be called or
402 // dropped on the origin thread.
403 // - `future` is not `'static`, but the caller guarantees that the
404 // task, and thus its `Runnable` must not live longer than `'a`.
405 // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
406 // Therefore we do not need to worry about what is done with the
407 // `Waker`.
408 let (runnable, task) = unsafe {
409 Builder::new()
410 .propagate_panic(true)
411 .spawn_unchecked(|()| future, self.schedule())
412 };
413 runnable.schedule();
414 task
415 }
416
417 /// Attempts to run a task if at least one is scheduled.
418 ///
419 /// Running a scheduled task means simply polling its future once.
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// use async_executor::LocalExecutor;
425 ///
426 /// let ex = LocalExecutor::new().leak();
427 /// assert!(!ex.try_tick()); // no tasks to run
428 ///
429 /// let task = ex.spawn(async {
430 /// println!("Hello world");
431 /// });
432 /// assert!(ex.try_tick()); // a task was found
433 /// ```
434 pub fn try_tick(&self) -> bool {
435 self.state.try_tick()
436 }
437
438 /// Runs a single task.
439 ///
440 /// Running a task means simply polling its future once.
441 ///
442 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
443 ///
444 /// # Examples
445 ///
446 /// ```
447 /// use async_executor::LocalExecutor;
448 /// use futures_lite::future;
449 ///
450 /// let ex = LocalExecutor::new().leak();
451 ///
452 /// let task = ex.spawn(async {
453 /// println!("Hello world");
454 /// });
455 /// future::block_on(ex.tick()); // runs the task
456 /// ```
457 pub async fn tick(&self) {
458 self.state.tick().await;
459 }
460
461 /// Runs the executor until the given future completes.
462 ///
463 /// # Examples
464 ///
465 /// ```
466 /// use async_executor::LocalExecutor;
467 /// use futures_lite::future;
468 ///
469 /// let ex = LocalExecutor::new().leak();
470 ///
471 /// let task = ex.spawn(async { 1 + 2 });
472 /// let res = future::block_on(ex.run(async { task.await * 2 }));
473 ///
474 /// assert_eq!(res, 6);
475 /// ```
476 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
477 self.state.run(future).await
478 }
479
480 /// Returns a function that schedules a runnable task when it gets woken up.
481 fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
482 let state: &'static State = &self.state;
483 // TODO: If possible, push into the current local queue and notify the ticker.
484 move |runnable| {
485 let result = state.queue.push(runnable);
486 debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
487 state.notify();
488 }
489 }
490}
491
492impl Default for StaticLocalExecutor {
493 fn default() -> Self {
494 Self::new()
495 }
496}