async_executor/static_executors.rs
1use crate::{debug_state, Executor, LocalExecutor, State};
2use async_task::{Builder, Runnable, Task};
3use slab::Slab;
4use std::{
5 cell::UnsafeCell,
6 fmt,
7 future::Future,
8 marker::PhantomData,
9 panic::{RefUnwindSafe, UnwindSafe},
10 sync::PoisonError,
11};
12
13impl Executor<'static> {
14 /// Consumes the [`Executor`] and intentionally leaks it.
15 ///
16 /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
17 /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations
18 /// when spawning, running, and finishing tasks.
19 ///
20 /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is
21 /// irreversible without the use of unsafe.
22 ///
23 /// # Example
24 ///
25 /// ```
26 /// use async_executor::Executor;
27 /// use futures_lite::future;
28 ///
29 /// let ex = Executor::new().leak();
30 ///
31 /// let task = ex.spawn(async {
32 /// println!("Hello world");
33 /// });
34 ///
35 /// future::block_on(ex.run(task));
36 /// ```
37 pub fn leak(self) -> &'static StaticExecutor {
38 let ptr = self.state_ptr();
39 // SAFETY: So long as an Executor lives, it's state pointer will always be valid
40 // when accessed through state_ptr. This executor will live for the full 'static
41 // lifetime so this isn't an arbitrary lifetime extension.
42 let state: &'static State = unsafe { &*ptr };
43
44 std::mem::forget(self);
45
46 let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
47 if !active.is_empty() {
48 // Reschedule all of the active tasks.
49 for waker in active.drain() {
50 waker.wake();
51 }
52 // Overwrite to ensure that the slab is deallocated.
53 *active = Slab::new();
54 }
55
56 // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent).
57 // The lifetime is not altered: 'static -> 'static.
58 let static_executor: &'static StaticExecutor = unsafe { std::mem::transmute(state) };
59 static_executor
60 }
61}
62
63impl LocalExecutor<'static> {
64 /// Consumes the [`LocalExecutor`] and intentionally leaks it.
65 ///
66 /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
67 /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations
68 /// when spawning, running, and finishing tasks.
69 ///
70 /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is
71 /// irreversible without the use of unsafe.
72 ///
73 /// # Example
74 ///
75 /// ```
76 /// use async_executor::LocalExecutor;
77 /// use futures_lite::future;
78 ///
79 /// let ex = LocalExecutor::new().leak();
80 ///
81 /// let task = ex.spawn(async {
82 /// println!("Hello world");
83 /// });
84 ///
85 /// future::block_on(ex.run(task));
86 /// ```
87 pub fn leak(self) -> &'static StaticLocalExecutor {
88 let ptr = self.inner.state_ptr();
89 // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
90 // when accessed through state_ptr. This executor will live for the full 'static
91 // lifetime so this isn't an arbitrary lifetime extension.
92 let state: &'static State = unsafe { &*ptr };
93
94 std::mem::forget(self);
95
96 let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner);
97 if !active.is_empty() {
98 // Reschedule all of the active tasks.
99 for waker in active.drain() {
100 waker.wake();
101 }
102 // Overwrite to ensure that the slab is deallocated.
103 *active = Slab::new();
104 }
105
106 // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent).
107 // The lifetime is not altered: 'static -> 'static.
108 let static_executor: &'static StaticLocalExecutor = unsafe { std::mem::transmute(state) };
109 static_executor
110 }
111}
112
113/// A static-lifetimed async [`Executor`].
114///
115/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static
116/// contexts via [`Executor::leak`].
117///
118/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
119/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
120///
121/// As this type does not implement `Drop`, losing the handle to the executor or failing
122/// to consistently drive the executor with [`StaticExecutor::tick`] or
123/// [`StaticExecutor::run`] will cause the all spawned tasks to permanently leak. Any
124/// tasks at the time will not be cancelled.
125///
126/// [`static`]: https://doc.rust-lang.org/std/keyword.static.html
127#[repr(transparent)]
128pub struct StaticExecutor {
129 state: State,
130}
131
132// SAFETY: Executor stores no thread local state that can be accessed via other thread.
133unsafe impl Send for StaticExecutor {}
134// SAFETY: Executor internally synchronizes all of it's operations internally.
135unsafe impl Sync for StaticExecutor {}
136
137impl UnwindSafe for StaticExecutor {}
138impl RefUnwindSafe for StaticExecutor {}
139
140impl fmt::Debug for StaticExecutor {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 debug_state(&self.state, "StaticExecutor", f)
143 }
144}
145
146impl StaticExecutor {
147 /// Creates a new StaticExecutor.
148 ///
149 /// # Examples
150 ///
151 /// ```
152 /// use async_executor::StaticExecutor;
153 ///
154 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
155 /// ```
156 pub const fn new() -> Self {
157 Self {
158 state: State::new(),
159 }
160 }
161
162 /// Spawns a task onto the executor.
163 ///
164 /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static`
165 /// borrow on the executor.
166 ///
167 /// # Examples
168 ///
169 /// ```
170 /// use async_executor::StaticExecutor;
171 ///
172 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
173 ///
174 /// let task = EXECUTOR.spawn(async {
175 /// println!("Hello world");
176 /// });
177 /// ```
178 pub fn spawn<T: Send + 'static>(
179 &'static self,
180 future: impl Future<Output = T> + Send + 'static,
181 ) -> Task<T> {
182 let (runnable, task) = Builder::new()
183 .propagate_panic(true)
184 .spawn(|()| future, self.schedule());
185 runnable.schedule();
186 task
187 }
188
189 /// Spawns a non-`'static` task onto the executor.
190 ///
191 /// ## Safety
192 ///
193 /// The caller must ensure that the returned task terminates
194 /// or is cancelled before the end of 'a.
195 pub unsafe fn spawn_scoped<'a, T: Send + 'a>(
196 &'static self,
197 future: impl Future<Output = T> + Send + 'a,
198 ) -> Task<T> {
199 // SAFETY:
200 //
201 // - `future` is `Send`
202 // - `future` is not `'static`, but the caller guarantees that the
203 // task, and thus its `Runnable` must not live longer than `'a`.
204 // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
205 // Therefore we do not need to worry about what is done with the
206 // `Waker`.
207 let (runnable, task) = unsafe {
208 Builder::new()
209 .propagate_panic(true)
210 .spawn_unchecked(|()| future, self.schedule())
211 };
212 runnable.schedule();
213 task
214 }
215
216 /// Attempts to run a task if at least one is scheduled.
217 ///
218 /// Running a scheduled task means simply polling its future once.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use async_executor::StaticExecutor;
224 ///
225 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
226 ///
227 /// assert!(!EXECUTOR.try_tick()); // no tasks to run
228 ///
229 /// let task = EXECUTOR.spawn(async {
230 /// println!("Hello world");
231 /// });
232 ///
233 /// assert!(EXECUTOR.try_tick()); // a task was found
234 /// ```
235 pub fn try_tick(&self) -> bool {
236 self.state.try_tick()
237 }
238
239 /// Runs a single task.
240 ///
241 /// Running a task means simply polling its future once.
242 ///
243 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
244 ///
245 /// # Examples
246 ///
247 /// ```
248 /// use async_executor::StaticExecutor;
249 /// use futures_lite::future;
250 ///
251 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
252 ///
253 /// let task = EXECUTOR.spawn(async {
254 /// println!("Hello world");
255 /// });
256 ///
257 /// future::block_on(EXECUTOR.tick()); // runs the task
258 /// ```
259 pub async fn tick(&self) {
260 self.state.tick().await;
261 }
262
263 /// Runs the executor until the given future completes.
264 ///
265 /// # Examples
266 ///
267 /// ```
268 /// use async_executor::StaticExecutor;
269 /// use futures_lite::future;
270 ///
271 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
272 ///
273 /// let task = EXECUTOR.spawn(async { 1 + 2 });
274 /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 }));
275 ///
276 /// assert_eq!(res, 6);
277 /// ```
278 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
279 self.state.run(future).await
280 }
281
282 /// Returns a function that schedules a runnable task when it gets woken up.
283 fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
284 let state: &'static State = &self.state;
285 // TODO: If possible, push into the current local queue and notify the ticker.
286 move |runnable| {
287 let result = state.queue.push(runnable);
288 debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
289 state.notify();
290 }
291 }
292}
293
294impl Default for StaticExecutor {
295 fn default() -> Self {
296 Self::new()
297 }
298}
299
300/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`].
301///
302/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static
303/// contexts via [`LocalExecutor::leak`].
304///
305/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
306/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
307///
308/// As this type does not implement `Drop`, losing the handle to the executor or failing
309/// to consistently drive the executor with [`StaticLocalExecutor::tick`] or
310/// [`StaticLocalExecutor::run`] will cause the all spawned tasks to permanently leak. Any
311/// tasks at the time will not be cancelled.
312///
313/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html
314#[repr(transparent)]
315pub struct StaticLocalExecutor {
316 state: State,
317 marker_: PhantomData<UnsafeCell<()>>,
318}
319
320impl UnwindSafe for StaticLocalExecutor {}
321impl RefUnwindSafe for StaticLocalExecutor {}
322
323impl fmt::Debug for StaticLocalExecutor {
324 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325 debug_state(&self.state, "StaticLocalExecutor", f)
326 }
327}
328
329impl StaticLocalExecutor {
330 /// Creates a new StaticLocalExecutor.
331 ///
332 /// # Examples
333 ///
334 /// ```
335 /// use async_executor::StaticLocalExecutor;
336 ///
337 /// thread_local! {
338 /// static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new();
339 /// }
340 /// ```
341 pub const fn new() -> Self {
342 Self {
343 state: State::new(),
344 marker_: PhantomData,
345 }
346 }
347
348 /// Spawns a task onto the executor.
349 ///
350 /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static`
351 /// borrow on the executor.
352 ///
353 /// # Examples
354 ///
355 /// ```
356 /// use async_executor::LocalExecutor;
357 ///
358 /// let ex = LocalExecutor::new().leak();
359 ///
360 /// let task = ex.spawn(async {
361 /// println!("Hello world");
362 /// });
363 /// ```
364 pub fn spawn<T: 'static>(&'static self, future: impl Future<Output = T> + 'static) -> Task<T> {
365 let (runnable, task) = Builder::new()
366 .propagate_panic(true)
367 .spawn_local(|()| future, self.schedule());
368 runnable.schedule();
369 task
370 }
371
372 /// Spawns a non-`'static` task onto the executor.
373 ///
374 /// ## Safety
375 ///
376 /// The caller must ensure that the returned task terminates
377 /// or is cancelled before the end of 'a.
378 pub unsafe fn spawn_scoped<'a, T: 'a>(
379 &'static self,
380 future: impl Future<Output = T> + 'a,
381 ) -> Task<T> {
382 // SAFETY:
383 //
384 // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`,
385 // `try_tick`, `tick` and `run` can only be called from the origin
386 // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only
387 // be called from the origin thread, ensuring that `future` and the executor
388 // share the same origin thread. The `Runnable` can be scheduled from other
389 // threads, but because of the above `Runnable` can only be called or
390 // dropped on the origin thread.
391 // - `future` is not `'static`, but the caller guarantees that the
392 // task, and thus its `Runnable` must not live longer than `'a`.
393 // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
394 // Therefore we do not need to worry about what is done with the
395 // `Waker`.
396 let (runnable, task) = unsafe {
397 Builder::new()
398 .propagate_panic(true)
399 .spawn_unchecked(|()| future, self.schedule())
400 };
401 runnable.schedule();
402 task
403 }
404
405 /// Attempts to run a task if at least one is scheduled.
406 ///
407 /// Running a scheduled task means simply polling its future once.
408 ///
409 /// # Examples
410 ///
411 /// ```
412 /// use async_executor::LocalExecutor;
413 ///
414 /// let ex = LocalExecutor::new().leak();
415 /// assert!(!ex.try_tick()); // no tasks to run
416 ///
417 /// let task = ex.spawn(async {
418 /// println!("Hello world");
419 /// });
420 /// assert!(ex.try_tick()); // a task was found
421 /// ```
422 pub fn try_tick(&self) -> bool {
423 self.state.try_tick()
424 }
425
426 /// Runs a single task.
427 ///
428 /// Running a task means simply polling its future once.
429 ///
430 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
431 ///
432 /// # Examples
433 ///
434 /// ```
435 /// use async_executor::LocalExecutor;
436 /// use futures_lite::future;
437 ///
438 /// let ex = LocalExecutor::new().leak();
439 ///
440 /// let task = ex.spawn(async {
441 /// println!("Hello world");
442 /// });
443 /// future::block_on(ex.tick()); // runs the task
444 /// ```
445 pub async fn tick(&self) {
446 self.state.tick().await;
447 }
448
449 /// Runs the executor until the given future completes.
450 ///
451 /// # Examples
452 ///
453 /// ```
454 /// use async_executor::LocalExecutor;
455 /// use futures_lite::future;
456 ///
457 /// let ex = LocalExecutor::new().leak();
458 ///
459 /// let task = ex.spawn(async { 1 + 2 });
460 /// let res = future::block_on(ex.run(async { task.await * 2 }));
461 ///
462 /// assert_eq!(res, 6);
463 /// ```
464 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
465 self.state.run(future).await
466 }
467
468 /// Returns a function that schedules a runnable task when it gets woken up.
469 fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
470 let state: &'static State = &self.state;
471 // TODO: If possible, push into the current local queue and notify the ticker.
472 move |runnable| {
473 let result = state.queue.push(runnable);
474 debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
475 state.notify();
476 }
477 }
478}
479
480impl Default for StaticLocalExecutor {
481 fn default() -> Self {
482 Self::new()
483 }
484}