async_executor/static_executors.rs
1use crate::{debug_state, Executor, LocalExecutor, State};
2use async_task::{Builder, Runnable, Task};
3use slab::Slab;
4use std::{
5 cell::UnsafeCell,
6 fmt,
7 future::Future,
8 marker::PhantomData,
9 panic::{RefUnwindSafe, UnwindSafe},
10};
11
12impl Executor<'static> {
13 /// Consumes the [`Executor`] and intentionally leaks it.
14 ///
15 /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
16 /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations
17 /// when spawning, running, and finishing tasks.
18 ///
19 /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is
20 /// irreversible without the use of unsafe.
21 ///
22 /// # Example
23 ///
24 /// ```
25 /// use async_executor::Executor;
26 /// use futures_lite::future;
27 ///
28 /// let ex = Executor::new().leak();
29 ///
30 /// let task = ex.spawn(async {
31 /// println!("Hello world");
32 /// });
33 ///
34 /// future::block_on(ex.run(task));
35 /// ```
36 pub fn leak(self) -> &'static StaticExecutor {
37 let ptr = self.state_ptr();
38 // SAFETY: So long as an Executor lives, it's state pointer will always be valid
39 // when accessed through state_ptr. This executor will live for the full 'static
40 // lifetime so this isn't an arbitrary lifetime extension.
41 let state: &'static State = unsafe { &*ptr };
42
43 std::mem::forget(self);
44
45 let mut active = state.active.lock().unwrap();
46 if !active.is_empty() {
47 // Reschedule all of the active tasks.
48 for waker in active.drain() {
49 waker.wake();
50 }
51 // Overwrite to ensure that the slab is deallocated.
52 *active = Slab::new();
53 }
54
55 // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent).
56 // The lifetime is not altered: 'static -> 'static.
57 let static_executor: &'static StaticExecutor = unsafe { std::mem::transmute(state) };
58 static_executor
59 }
60}
61
62impl LocalExecutor<'static> {
63 /// Consumes the [`LocalExecutor`] and intentionally leaks it.
64 ///
65 /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
66 /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations
67 /// when spawning, running, and finishing tasks.
68 ///
69 /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is
70 /// irreversible without the use of unsafe.
71 ///
72 /// # Example
73 ///
74 /// ```
75 /// use async_executor::LocalExecutor;
76 /// use futures_lite::future;
77 ///
78 /// let ex = LocalExecutor::new().leak();
79 ///
80 /// let task = ex.spawn(async {
81 /// println!("Hello world");
82 /// });
83 ///
84 /// future::block_on(ex.run(task));
85 /// ```
86 pub fn leak(self) -> &'static StaticLocalExecutor {
87 let ptr = self.inner.state_ptr();
88 // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
89 // when accessed through state_ptr. This executor will live for the full 'static
90 // lifetime so this isn't an arbitrary lifetime extension.
91 let state: &'static State = unsafe { &*ptr };
92
93 std::mem::forget(self);
94
95 let mut active = state.active.lock().unwrap();
96 if !active.is_empty() {
97 // Reschedule all of the active tasks.
98 for waker in active.drain() {
99 waker.wake();
100 }
101 // Overwrite to ensure that the slab is deallocated.
102 *active = Slab::new();
103 }
104
105 // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent).
106 // The lifetime is not altered: 'static -> 'static.
107 let static_executor: &'static StaticLocalExecutor = unsafe { std::mem::transmute(state) };
108 static_executor
109 }
110}
111
112/// A static-lifetimed async [`Executor`].
113///
114/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static
115/// contexts via [`Executor::leak`].
116///
117/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
118/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
119///
120/// As this type does not implement `Drop`, losing the handle to the executor or failing
121/// to consistently drive the executor with [`StaticExecutor::tick`] or
122/// [`StaticExecutor::run`] will cause the all spawned tasks to permanently leak. Any
123/// tasks at the time will not be cancelled.
124///
125/// [`static`]: https://doc.rust-lang.org/std/keyword.static.html
126#[repr(transparent)]
127pub struct StaticExecutor {
128 state: State,
129}
130
131// SAFETY: Executor stores no thread local state that can be accessed via other thread.
132unsafe impl Send for StaticExecutor {}
133// SAFETY: Executor internally synchronizes all of it's operations internally.
134unsafe impl Sync for StaticExecutor {}
135
136impl UnwindSafe for StaticExecutor {}
137impl RefUnwindSafe for StaticExecutor {}
138
139impl fmt::Debug for StaticExecutor {
140 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141 debug_state(&self.state, "StaticExecutor", f)
142 }
143}
144
145impl StaticExecutor {
146 /// Creates a new StaticExecutor.
147 ///
148 /// # Examples
149 ///
150 /// ```
151 /// use async_executor::StaticExecutor;
152 ///
153 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
154 /// ```
155 pub const fn new() -> Self {
156 Self {
157 state: State::new(),
158 }
159 }
160
161 /// Spawns a task onto the executor.
162 ///
163 /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static`
164 /// borrow on the executor.
165 ///
166 /// # Examples
167 ///
168 /// ```
169 /// use async_executor::StaticExecutor;
170 ///
171 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
172 ///
173 /// let task = EXECUTOR.spawn(async {
174 /// println!("Hello world");
175 /// });
176 /// ```
177 pub fn spawn<T: Send + 'static>(
178 &'static self,
179 future: impl Future<Output = T> + Send + 'static,
180 ) -> Task<T> {
181 let (runnable, task) = Builder::new()
182 .propagate_panic(true)
183 .spawn(|()| future, self.schedule());
184 runnable.schedule();
185 task
186 }
187
188 /// Spawns a non-`'static` task onto the executor.
189 ///
190 /// ## Safety
191 ///
192 /// The caller must ensure that the returned task terminates
193 /// or is cancelled before the end of 'a.
194 pub unsafe fn spawn_scoped<'a, T: Send + 'a>(
195 &'static self,
196 future: impl Future<Output = T> + Send + 'a,
197 ) -> Task<T> {
198 // SAFETY:
199 //
200 // - `future` is `Send`
201 // - `future` is not `'static`, but the caller guarantees that the
202 // task, and thus its `Runnable` must not live longer than `'a`.
203 // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
204 // Therefore we do not need to worry about what is done with the
205 // `Waker`.
206 let (runnable, task) = unsafe {
207 Builder::new()
208 .propagate_panic(true)
209 .spawn_unchecked(|()| future, self.schedule())
210 };
211 runnable.schedule();
212 task
213 }
214
215 /// Attempts to run a task if at least one is scheduled.
216 ///
217 /// Running a scheduled task means simply polling its future once.
218 ///
219 /// # Examples
220 ///
221 /// ```
222 /// use async_executor::StaticExecutor;
223 ///
224 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
225 ///
226 /// assert!(!EXECUTOR.try_tick()); // no tasks to run
227 ///
228 /// let task = EXECUTOR.spawn(async {
229 /// println!("Hello world");
230 /// });
231 ///
232 /// assert!(EXECUTOR.try_tick()); // a task was found
233 /// ```
234 pub fn try_tick(&self) -> bool {
235 self.state.try_tick()
236 }
237
238 /// Runs a single task.
239 ///
240 /// Running a task means simply polling its future once.
241 ///
242 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
243 ///
244 /// # Examples
245 ///
246 /// ```
247 /// use async_executor::StaticExecutor;
248 /// use futures_lite::future;
249 ///
250 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
251 ///
252 /// let task = EXECUTOR.spawn(async {
253 /// println!("Hello world");
254 /// });
255 ///
256 /// future::block_on(EXECUTOR.tick()); // runs the task
257 /// ```
258 pub async fn tick(&self) {
259 self.state.tick().await;
260 }
261
262 /// Runs the executor until the given future completes.
263 ///
264 /// # Examples
265 ///
266 /// ```
267 /// use async_executor::StaticExecutor;
268 /// use futures_lite::future;
269 ///
270 /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
271 ///
272 /// let task = EXECUTOR.spawn(async { 1 + 2 });
273 /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 }));
274 ///
275 /// assert_eq!(res, 6);
276 /// ```
277 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
278 self.state.run(future).await
279 }
280
281 /// Returns a function that schedules a runnable task when it gets woken up.
282 fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
283 let state: &'static State = &self.state;
284 // TODO: If possible, push into the current local queue and notify the ticker.
285 move |runnable| {
286 state.queue.push(runnable).unwrap();
287 state.notify();
288 }
289 }
290}
291
292impl Default for StaticExecutor {
293 fn default() -> Self {
294 Self::new()
295 }
296}
297
298/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`].
299///
300/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static
301/// contexts via [`LocalExecutor::leak`].
302///
303/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
304/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
305///
306/// As this type does not implement `Drop`, losing the handle to the executor or failing
307/// to consistently drive the executor with [`StaticLocalExecutor::tick`] or
308/// [`StaticLocalExecutor::run`] will cause the all spawned tasks to permanently leak. Any
309/// tasks at the time will not be cancelled.
310///
311/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html
312#[repr(transparent)]
313pub struct StaticLocalExecutor {
314 state: State,
315 marker_: PhantomData<UnsafeCell<()>>,
316}
317
318impl UnwindSafe for StaticLocalExecutor {}
319impl RefUnwindSafe for StaticLocalExecutor {}
320
321impl fmt::Debug for StaticLocalExecutor {
322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323 debug_state(&self.state, "StaticLocalExecutor", f)
324 }
325}
326
327impl StaticLocalExecutor {
328 /// Creates a new StaticLocalExecutor.
329 ///
330 /// # Examples
331 ///
332 /// ```
333 /// use async_executor::StaticLocalExecutor;
334 ///
335 /// thread_local! {
336 /// static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new();
337 /// }
338 /// ```
339 pub const fn new() -> Self {
340 Self {
341 state: State::new(),
342 marker_: PhantomData,
343 }
344 }
345
346 /// Spawns a task onto the executor.
347 ///
348 /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static`
349 /// borrow on the executor.
350 ///
351 /// # Examples
352 ///
353 /// ```
354 /// use async_executor::LocalExecutor;
355 ///
356 /// let ex = LocalExecutor::new().leak();
357 ///
358 /// let task = ex.spawn(async {
359 /// println!("Hello world");
360 /// });
361 /// ```
362 pub fn spawn<T: 'static>(&'static self, future: impl Future<Output = T> + 'static) -> Task<T> {
363 let (runnable, task) = Builder::new()
364 .propagate_panic(true)
365 .spawn_local(|()| future, self.schedule());
366 runnable.schedule();
367 task
368 }
369
370 /// Spawns a non-`'static` task onto the executor.
371 ///
372 /// ## Safety
373 ///
374 /// The caller must ensure that the returned task terminates
375 /// or is cancelled before the end of 'a.
376 pub unsafe fn spawn_scoped<'a, T: 'a>(
377 &'static self,
378 future: impl Future<Output = T> + 'a,
379 ) -> Task<T> {
380 // SAFETY:
381 //
382 // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`,
383 // `try_tick`, `tick` and `run` can only be called from the origin
384 // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only
385 // be called from the origin thread, ensuring that `future` and the executor
386 // share the same origin thread. The `Runnable` can be scheduled from other
387 // threads, but because of the above `Runnable` can only be called or
388 // dropped on the origin thread.
389 // - `future` is not `'static`, but the caller guarantees that the
390 // task, and thus its `Runnable` must not live longer than `'a`.
391 // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
392 // Therefore we do not need to worry about what is done with the
393 // `Waker`.
394 let (runnable, task) = unsafe {
395 Builder::new()
396 .propagate_panic(true)
397 .spawn_unchecked(|()| future, self.schedule())
398 };
399 runnable.schedule();
400 task
401 }
402
403 /// Attempts to run a task if at least one is scheduled.
404 ///
405 /// Running a scheduled task means simply polling its future once.
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// use async_executor::LocalExecutor;
411 ///
412 /// let ex = LocalExecutor::new().leak();
413 /// assert!(!ex.try_tick()); // no tasks to run
414 ///
415 /// let task = ex.spawn(async {
416 /// println!("Hello world");
417 /// });
418 /// assert!(ex.try_tick()); // a task was found
419 /// ```
420 pub fn try_tick(&self) -> bool {
421 self.state.try_tick()
422 }
423
424 /// Runs a single task.
425 ///
426 /// Running a task means simply polling its future once.
427 ///
428 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
429 ///
430 /// # Examples
431 ///
432 /// ```
433 /// use async_executor::LocalExecutor;
434 /// use futures_lite::future;
435 ///
436 /// let ex = LocalExecutor::new().leak();
437 ///
438 /// let task = ex.spawn(async {
439 /// println!("Hello world");
440 /// });
441 /// future::block_on(ex.tick()); // runs the task
442 /// ```
443 pub async fn tick(&self) {
444 self.state.tick().await;
445 }
446
447 /// Runs the executor until the given future completes.
448 ///
449 /// # Examples
450 ///
451 /// ```
452 /// use async_executor::LocalExecutor;
453 /// use futures_lite::future;
454 ///
455 /// let ex = LocalExecutor::new().leak();
456 ///
457 /// let task = ex.spawn(async { 1 + 2 });
458 /// let res = future::block_on(ex.run(async { task.await * 2 }));
459 ///
460 /// assert_eq!(res, 6);
461 /// ```
462 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
463 self.state.run(future).await
464 }
465
466 /// Returns a function that schedules a runnable task when it gets woken up.
467 fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
468 let state: &'static State = &self.state;
469 // TODO: If possible, push into the current local queue and notify the ticker.
470 move |runnable| {
471 state.queue.push(runnable).unwrap();
472 state.notify();
473 }
474 }
475}
476
477impl Default for StaticLocalExecutor {
478 fn default() -> Self {
479 Self::new()
480 }
481}