async_executor/lib.rs
1//! Async executors.
2//!
3//! This crate provides two reference executors that trade performance for
4//! functionality. They should be considered reference executors that are "good
5//! enough" for most use cases. For more specialized use cases, consider writing
6//! your own executor on top of [`async-task`].
7//!
8//! [`async-task`]: https://crates.io/crates/async-task
9//!
10//! # Examples
11//!
12//! ```
13//! use async_executor::Executor;
14//! use futures_lite::future;
15//!
16//! // Create a new executor.
17//! let ex = Executor::new();
18//!
19//! // Spawn a task.
20//! let task = ex.spawn(async {
21//! println!("Hello world");
22//! });
23//!
24//! // Run the executor until the task completes.
25//! future::block_on(ex.run(task));
26//! ```
27
28#![warn(
29 missing_docs,
30 missing_debug_implementations,
31 rust_2018_idioms,
32 clippy::undocumented_unsafe_blocks
33)]
34#![doc(
35 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40#![cfg_attr(docsrs, feature(doc_cfg))]
41#![allow(clippy::unused_unit)] // false positive fixed in Rust 1.89
42
43extern crate alloc;
44
45use alloc::rc::Rc;
46use alloc::sync::Arc;
47use alloc::vec::Vec;
48use core::fmt;
49use core::marker::PhantomData;
50use core::panic::{RefUnwindSafe, UnwindSafe};
51use core::pin::Pin;
52use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
53use core::task::{Context, Poll, Waker};
54use std::sync::{Mutex, MutexGuard, PoisonError, RwLock, TryLockError};
55
56use async_task::{Builder, Runnable};
57use concurrent_queue::ConcurrentQueue;
58use futures_lite::{future, prelude::*};
59use pin_project_lite::pin_project;
60use slab::Slab;
61
62#[cfg(feature = "static")]
63mod static_executors;
64
65#[doc(no_inline)]
66pub use async_task::{FallibleTask, Task};
67#[cfg(feature = "static")]
68#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
69pub use static_executors::*;
70
71/// An async executor.
72///
73/// # Examples
74///
75/// A multi-threaded executor:
76///
77/// ```
78/// use async_channel::unbounded;
79/// use async_executor::Executor;
80/// use easy_parallel::Parallel;
81/// use futures_lite::future;
82///
83/// let ex = Executor::new();
84/// let (signal, shutdown) = unbounded::<()>();
85///
86/// Parallel::new()
87/// // Run four executor threads.
88/// .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
89/// // Run the main future on the current thread.
90/// .finish(|| future::block_on(async {
91/// println!("Hello world!");
92/// drop(signal);
93/// }));
94/// ```
95pub struct Executor<'a> {
96 /// The executor state.
97 state: AtomicPtr<State>,
98
99 /// Makes the `'a` lifetime invariant.
100 _marker: PhantomData<core::cell::UnsafeCell<&'a ()>>,
101}
102
103// SAFETY: Executor stores no thread local state that can be accessed via other thread.
104unsafe impl Send for Executor<'_> {}
105// SAFETY: Executor internally synchronizes all of it's operations internally.
106unsafe impl Sync for Executor<'_> {}
107
108impl UnwindSafe for Executor<'_> {}
109impl RefUnwindSafe for Executor<'_> {}
110
111impl fmt::Debug for Executor<'_> {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 debug_executor(self, "Executor", f)
114 }
115}
116
117impl<'a> Executor<'a> {
118 /// Creates a new executor.
119 ///
120 /// # Examples
121 ///
122 /// ```
123 /// use async_executor::Executor;
124 ///
125 /// let ex = Executor::new();
126 /// ```
127 pub const fn new() -> Self {
128 Self {
129 state: AtomicPtr::new(core::ptr::null_mut()),
130 _marker: PhantomData,
131 }
132 }
133
134 /// Returns `true` if there are no unfinished tasks.
135 ///
136 /// # Examples
137 ///
138 /// ```
139 /// use async_executor::Executor;
140 ///
141 /// let ex = Executor::new();
142 /// assert!(ex.is_empty());
143 ///
144 /// let task = ex.spawn(async {
145 /// println!("Hello world");
146 /// });
147 /// assert!(!ex.is_empty());
148 ///
149 /// assert!(ex.try_tick());
150 /// assert!(ex.is_empty());
151 /// ```
152 pub fn is_empty(&self) -> bool {
153 self.state().active().is_empty()
154 }
155
156 /// Spawns a task onto the executor.
157 ///
158 /// # Examples
159 ///
160 /// ```
161 /// use async_executor::Executor;
162 ///
163 /// let ex = Executor::new();
164 ///
165 /// let task = ex.spawn(async {
166 /// println!("Hello world");
167 /// });
168 /// ```
169 pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
170 let state = self.state();
171 let mut active = state.active();
172
173 // SAFETY: `T` and the future are `Send`.
174 unsafe { Self::spawn_inner(state, future, &mut active) }
175 }
176
177 /// Spawns many tasks onto the executor.
178 ///
179 /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
180 /// spawns all of the tasks in one go. With large amounts of tasks this can improve
181 /// contention.
182 ///
183 /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
184 /// prevent runner thread starvation. It is assumed that the iterator provided does not
185 /// block; blocking iterators can lock up the internal mutex and therefore the entire
186 /// executor.
187 ///
188 /// ## Example
189 ///
190 /// ```
191 /// use async_executor::Executor;
192 /// use futures_lite::{stream, prelude::*};
193 /// use core::future::ready;
194 ///
195 /// # futures_lite::future::block_on(async {
196 /// let mut ex = Executor::new();
197 ///
198 /// let futures = [
199 /// ready(1),
200 /// ready(2),
201 /// ready(3)
202 /// ];
203 ///
204 /// // Spawn all of the futures onto the executor at once.
205 /// let mut tasks = vec![];
206 /// ex.spawn_many(futures, &mut tasks);
207 ///
208 /// // Await all of them.
209 /// let results = ex.run(async move {
210 /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
211 /// }).await;
212 /// assert_eq!(results, [1, 2, 3]);
213 /// # });
214 /// ```
215 ///
216 /// [`spawn`]: Executor::spawn
217 pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
218 &self,
219 futures: impl IntoIterator<Item = F>,
220 handles: &mut impl Extend<Task<F::Output>>,
221 ) {
222 let state = self.state();
223 let mut active = Some(state.as_ref().active());
224
225 // Convert the futures into tasks.
226 let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
227 // SAFETY: `T` and the future are `Send`.
228 let task = unsafe { Self::spawn_inner(state, future, active.as_mut().unwrap()) };
229
230 // Yield the lock every once in a while to ease contention.
231 if i.wrapping_sub(1) % 500 == 0 {
232 drop(active.take());
233 active = Some(self.state().active());
234 }
235
236 task
237 });
238
239 // Push the tasks to the user's collection.
240 handles.extend(tasks);
241 }
242
243 /// Spawn a future while holding the inner lock.
244 ///
245 /// # Safety
246 ///
247 /// If this is an `Executor`, `F` and `T` must be `Send`.
248 unsafe fn spawn_inner<T: 'a>(
249 state: Pin<&'a State>,
250 future: impl Future<Output = T> + 'a,
251 active: &mut Slab<Waker>,
252 ) -> Task<T> {
253 // Remove the task from the set of active tasks when the future finishes.
254 let entry = active.vacant_entry();
255 let index = entry.key();
256 let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index)));
257
258 // Create the task and register it in the set of active tasks.
259 //
260 // SAFETY:
261 //
262 // If `future` is not `Send`, this must be a `LocalExecutor` as per this
263 // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
264 // `try_tick`, `tick` and `run` can only be called from the origin
265 // thread of the `LocalExecutor`. Similarly, `spawn` can only be called
266 // from the origin thread, ensuring that `future` and the executor share
267 // the same origin thread. The `Runnable` can be scheduled from other
268 // threads, but because of the above `Runnable` can only be called or
269 // dropped on the origin thread.
270 //
271 // `future` is not `'static`, but we make sure that the `Runnable` does
272 // not outlive `'a`. When the executor is dropped, the `active` field is
273 // drained and all of the `Waker`s are woken. Then, the queue inside of
274 // the `Executor` is drained of all of its runnables. This ensures that
275 // runnables are dropped and this precondition is satisfied.
276 //
277 // `Self::schedule` is `Send` and `Sync`, as checked below.
278 // Therefore we do not need to worry about which thread the `Waker` is used
279 // and dropped on.
280 //
281 // `Self::schedule` may not be `'static`, but we make sure that the `Waker` does
282 // not outlive `'a`. When the executor is dropped, the `active` field is
283 // drained and all of the `Waker`s are woken.
284 let (runnable, task) = Builder::new()
285 .propagate_panic(true)
286 .spawn_unchecked(|()| future, Self::schedule(state));
287 entry.insert(runnable.waker());
288
289 runnable.schedule();
290 task
291 }
292
293 /// Attempts to run a task if at least one is scheduled.
294 ///
295 /// Running a scheduled task means simply polling its future once.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use async_executor::Executor;
301 ///
302 /// let ex = Executor::new();
303 /// assert!(!ex.try_tick()); // no tasks to run
304 ///
305 /// let task = ex.spawn(async {
306 /// println!("Hello world");
307 /// });
308 /// assert!(ex.try_tick()); // a task was found
309 /// ```
310 pub fn try_tick(&self) -> bool {
311 self.state().try_tick()
312 }
313
314 /// Runs a single task.
315 ///
316 /// Running a task means simply polling its future once.
317 ///
318 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
319 ///
320 /// # Examples
321 ///
322 /// ```
323 /// use async_executor::Executor;
324 /// use futures_lite::future;
325 ///
326 /// let ex = Executor::new();
327 ///
328 /// let task = ex.spawn(async {
329 /// println!("Hello world");
330 /// });
331 /// future::block_on(ex.tick()); // runs the task
332 /// ```
333 pub async fn tick(&self) {
334 self.state().tick().await;
335 }
336
337 /// Runs the executor until the given future completes.
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// use async_executor::Executor;
343 /// use futures_lite::future;
344 ///
345 /// let ex = Executor::new();
346 ///
347 /// let task = ex.spawn(async { 1 + 2 });
348 /// let res = future::block_on(ex.run(async { task.await * 2 }));
349 ///
350 /// assert_eq!(res, 6);
351 /// ```
352 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
353 self.state().run(future).await
354 }
355
356 /// Returns a function that schedules a runnable task when it gets woken up.
357 fn schedule(state: Pin<&'a State>) -> impl Fn(Runnable) + Send + Sync + 'a {
358 // TODO: If possible, push into the current local queue and notify the ticker.
359 move |runnable| {
360 let result = state.queue.push(runnable);
361 debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
362 state.notify();
363 }
364 }
365
366 /// Returns a pointer to the inner state.
367 #[inline]
368 fn state(&self) -> Pin<&'a State> {
369 #[cold]
370 fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
371 let state = Arc::new(State::new());
372 let ptr = Arc::into_raw(state).cast_mut();
373 if let Err(actual) = atomic_ptr.compare_exchange(
374 core::ptr::null_mut(),
375 ptr,
376 Ordering::AcqRel,
377 Ordering::Acquire,
378 ) {
379 // SAFETY: This was just created from Arc::into_raw.
380 drop(unsafe { Arc::from_raw(ptr) });
381 actual
382 } else {
383 ptr
384 }
385 }
386
387 let mut ptr = self.state.load(Ordering::Acquire);
388 if ptr.is_null() {
389 ptr = alloc_state(&self.state);
390 }
391
392 // SAFETY: So long as an Executor lives, it's state pointer will always be valid
393 // and will never be moved until it's dropped.
394 Pin::new(unsafe { &*ptr })
395 }
396}
397
398impl Drop for Executor<'_> {
399 fn drop(&mut self) {
400 let ptr = *self.state.get_mut();
401 if ptr.is_null() {
402 return;
403 }
404
405 // SAFETY: As ptr is not null, it was allocated via Arc::new and converted
406 // via Arc::into_raw in state_ptr.
407 let state = unsafe { Arc::from_raw(ptr) };
408
409 let mut active = state.pin().active();
410 for w in active.drain() {
411 w.wake();
412 }
413 drop(active);
414
415 while state.queue.pop().is_ok() {}
416 }
417}
418
419impl<'a> Default for Executor<'a> {
420 fn default() -> Self {
421 Self::new()
422 }
423}
424
425/// A thread-local executor.
426///
427/// The executor can only be run on the thread that created it.
428///
429/// # Examples
430///
431/// ```
432/// use async_executor::LocalExecutor;
433/// use futures_lite::future;
434///
435/// let local_ex = LocalExecutor::new();
436///
437/// future::block_on(local_ex.run(async {
438/// println!("Hello world!");
439/// }));
440/// ```
441pub struct LocalExecutor<'a> {
442 /// The inner executor.
443 inner: Executor<'a>,
444
445 /// Makes the type `!Send` and `!Sync`.
446 _marker: PhantomData<Rc<()>>,
447}
448
449impl UnwindSafe for LocalExecutor<'_> {}
450impl RefUnwindSafe for LocalExecutor<'_> {}
451
452impl fmt::Debug for LocalExecutor<'_> {
453 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
454 debug_executor(&self.inner, "LocalExecutor", f)
455 }
456}
457
458impl<'a> LocalExecutor<'a> {
459 /// Creates a single-threaded executor.
460 ///
461 /// # Examples
462 ///
463 /// ```
464 /// use async_executor::LocalExecutor;
465 ///
466 /// let local_ex = LocalExecutor::new();
467 /// ```
468 pub const fn new() -> Self {
469 Self {
470 inner: Executor::new(),
471 _marker: PhantomData,
472 }
473 }
474
475 /// Returns `true` if there are no unfinished tasks.
476 ///
477 /// # Examples
478 ///
479 /// ```
480 /// use async_executor::LocalExecutor;
481 ///
482 /// let local_ex = LocalExecutor::new();
483 /// assert!(local_ex.is_empty());
484 ///
485 /// let task = local_ex.spawn(async {
486 /// println!("Hello world");
487 /// });
488 /// assert!(!local_ex.is_empty());
489 ///
490 /// assert!(local_ex.try_tick());
491 /// assert!(local_ex.is_empty());
492 /// ```
493 pub fn is_empty(&self) -> bool {
494 self.inner().is_empty()
495 }
496
497 /// Spawns a task onto the executor.
498 ///
499 /// # Examples
500 ///
501 /// ```
502 /// use async_executor::LocalExecutor;
503 ///
504 /// let local_ex = LocalExecutor::new();
505 ///
506 /// let task = local_ex.spawn(async {
507 /// println!("Hello world");
508 /// });
509 /// ```
510 pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
511 let state = self.inner().state();
512 let mut active = state.active();
513
514 // SAFETY: This executor is not thread safe, so the future and its result
515 // cannot be sent to another thread.
516 unsafe { Executor::spawn_inner(state, future, &mut active) }
517 }
518
519 /// Spawns many tasks onto the executor.
520 ///
521 /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
522 /// spawns all of the tasks in one go. With large amounts of tasks this can improve
523 /// contention.
524 ///
525 /// It is assumed that the iterator provided does not block; blocking iterators can lock up
526 /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
527 /// mutex is not released, as there are no other threads that can poll this executor.
528 ///
529 /// ## Example
530 ///
531 /// ```
532 /// use async_executor::LocalExecutor;
533 /// use futures_lite::{stream, prelude::*};
534 /// use core::future::ready;
535 ///
536 /// # futures_lite::future::block_on(async {
537 /// let mut ex = LocalExecutor::new();
538 ///
539 /// let futures = [
540 /// ready(1),
541 /// ready(2),
542 /// ready(3)
543 /// ];
544 ///
545 /// // Spawn all of the futures onto the executor at once.
546 /// let mut tasks = vec![];
547 /// ex.spawn_many(futures, &mut tasks);
548 ///
549 /// // Await all of them.
550 /// let results = ex.run(async move {
551 /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
552 /// }).await;
553 /// assert_eq!(results, [1, 2, 3]);
554 /// # });
555 /// ```
556 ///
557 /// [`spawn`]: LocalExecutor::spawn
558 pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
559 &self,
560 futures: impl IntoIterator<Item = F>,
561 handles: &mut impl Extend<Task<F::Output>>,
562 ) {
563 let state = self.inner().state();
564 let mut active = state.active();
565
566 // Convert all of the futures to tasks.
567 let tasks = futures.into_iter().map(|future| {
568 // SAFETY: This executor is not thread safe, so the future and its result
569 // cannot be sent to another thread.
570 unsafe { Executor::spawn_inner(state, future, &mut active) }
571
572 // As only one thread can spawn or poll tasks at a time, there is no need
573 // to release lock contention here.
574 });
575
576 // Push them to the user's collection.
577 handles.extend(tasks);
578 }
579
580 /// Attempts to run a task if at least one is scheduled.
581 ///
582 /// Running a scheduled task means simply polling its future once.
583 ///
584 /// # Examples
585 ///
586 /// ```
587 /// use async_executor::LocalExecutor;
588 ///
589 /// let ex = LocalExecutor::new();
590 /// assert!(!ex.try_tick()); // no tasks to run
591 ///
592 /// let task = ex.spawn(async {
593 /// println!("Hello world");
594 /// });
595 /// assert!(ex.try_tick()); // a task was found
596 /// ```
597 pub fn try_tick(&self) -> bool {
598 self.inner().try_tick()
599 }
600
601 /// Runs a single task.
602 ///
603 /// Running a task means simply polling its future once.
604 ///
605 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
606 ///
607 /// # Examples
608 ///
609 /// ```
610 /// use async_executor::LocalExecutor;
611 /// use futures_lite::future;
612 ///
613 /// let ex = LocalExecutor::new();
614 ///
615 /// let task = ex.spawn(async {
616 /// println!("Hello world");
617 /// });
618 /// future::block_on(ex.tick()); // runs the task
619 /// ```
620 pub async fn tick(&self) {
621 self.inner().tick().await
622 }
623
624 /// Runs the executor until the given future completes.
625 ///
626 /// # Examples
627 ///
628 /// ```
629 /// use async_executor::LocalExecutor;
630 /// use futures_lite::future;
631 ///
632 /// let local_ex = LocalExecutor::new();
633 ///
634 /// let task = local_ex.spawn(async { 1 + 2 });
635 /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
636 ///
637 /// assert_eq!(res, 6);
638 /// ```
639 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
640 self.inner().run(future).await
641 }
642
643 /// Returns a reference to the inner executor.
644 fn inner(&self) -> &Executor<'a> {
645 &self.inner
646 }
647}
648
649impl<'a> Default for LocalExecutor<'a> {
650 fn default() -> Self {
651 Self::new()
652 }
653}
654
655/// The state of a executor.
656struct State {
657 /// The global queue.
658 queue: ConcurrentQueue<Runnable>,
659
660 /// Local queues created by runners.
661 local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
662
663 /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
664 notified: AtomicBool,
665
666 /// A list of sleeping tickers.
667 sleepers: Mutex<Sleepers>,
668
669 /// Currently active tasks.
670 active: Mutex<Slab<Waker>>,
671}
672
673impl State {
674 /// Creates state for a new executor.
675 const fn new() -> Self {
676 Self {
677 queue: ConcurrentQueue::unbounded(),
678 local_queues: RwLock::new(Vec::new()),
679 notified: AtomicBool::new(true),
680 sleepers: Mutex::new(Sleepers {
681 count: 0,
682 wakers: Vec::new(),
683 free_ids: Vec::new(),
684 }),
685 active: Mutex::new(Slab::new()),
686 }
687 }
688
689 fn pin(&self) -> Pin<&Self> {
690 Pin::new(self)
691 }
692
693 /// Returns a reference to currently active tasks.
694 fn active(self: Pin<&Self>) -> MutexGuard<'_, Slab<Waker>> {
695 self.get_ref()
696 .active
697 .lock()
698 .unwrap_or_else(PoisonError::into_inner)
699 }
700
701 /// Notifies a sleeping ticker.
702 #[inline]
703 fn notify(&self) {
704 if self
705 .notified
706 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
707 .is_ok()
708 {
709 let waker = self
710 .sleepers
711 .lock()
712 .unwrap_or_else(PoisonError::into_inner)
713 .notify();
714 if let Some(w) = waker {
715 w.wake();
716 }
717 }
718 }
719
720 pub(crate) fn try_tick(&self) -> bool {
721 match self.queue.pop() {
722 Err(_) => false,
723 Ok(runnable) => {
724 // Notify another ticker now to pick up where this ticker left off, just in case
725 // running the task takes a long time.
726 self.notify();
727
728 // Run the task.
729 runnable.run();
730 true
731 }
732 }
733 }
734
735 pub(crate) async fn tick(&self) {
736 let runnable = Ticker::new(self).runnable().await;
737 runnable.run();
738 }
739
740 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
741 let mut runner = Runner::new(self);
742 let mut rng = fastrand::Rng::new();
743
744 // A future that runs tasks forever.
745 let run_forever = async {
746 loop {
747 for _ in 0..200 {
748 let runnable = runner.runnable(&mut rng).await;
749 runnable.run();
750 }
751 future::yield_now().await;
752 }
753 };
754
755 // Run `future` and `run_forever` concurrently until `future` completes.
756 future.or(run_forever).await
757 }
758}
759
760/// A list of sleeping tickers.
761struct Sleepers {
762 /// Number of sleeping tickers (both notified and unnotified).
763 count: usize,
764
765 /// IDs and wakers of sleeping unnotified tickers.
766 ///
767 /// A sleeping ticker is notified when its waker is missing from this list.
768 wakers: Vec<(usize, Waker)>,
769
770 /// Reclaimed IDs.
771 free_ids: Vec<usize>,
772}
773
774impl Sleepers {
775 /// Inserts a new sleeping ticker.
776 fn insert(&mut self, waker: &Waker) -> usize {
777 let id = match self.free_ids.pop() {
778 Some(id) => id,
779 None => self.count + 1,
780 };
781 self.count += 1;
782 self.wakers.push((id, waker.clone()));
783 id
784 }
785
786 /// Re-inserts a sleeping ticker's waker if it was notified.
787 ///
788 /// Returns `true` if the ticker was notified.
789 fn update(&mut self, id: usize, waker: &Waker) -> bool {
790 for item in &mut self.wakers {
791 if item.0 == id {
792 item.1.clone_from(waker);
793 return false;
794 }
795 }
796
797 self.wakers.push((id, waker.clone()));
798 true
799 }
800
801 /// Removes a previously inserted sleeping ticker.
802 ///
803 /// Returns `true` if the ticker was notified.
804 fn remove(&mut self, id: usize) -> bool {
805 self.count -= 1;
806 self.free_ids.push(id);
807
808 for i in (0..self.wakers.len()).rev() {
809 if self.wakers[i].0 == id {
810 self.wakers.remove(i);
811 return false;
812 }
813 }
814 true
815 }
816
817 /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
818 fn is_notified(&self) -> bool {
819 self.count == 0 || self.count > self.wakers.len()
820 }
821
822 /// Returns notification waker for a sleeping ticker.
823 ///
824 /// If a ticker was notified already or there are no tickers, `None` will be returned.
825 fn notify(&mut self) -> Option<Waker> {
826 if self.wakers.len() == self.count {
827 self.wakers.pop().map(|item| item.1)
828 } else {
829 None
830 }
831 }
832}
833
834/// Runs task one by one.
835struct Ticker<'a> {
836 /// The executor state.
837 state: &'a State,
838
839 /// Set to a non-zero sleeper ID when in sleeping state.
840 ///
841 /// States a ticker can be in:
842 /// 1) Woken.
843 /// 2a) Sleeping and unnotified.
844 /// 2b) Sleeping and notified.
845 sleeping: usize,
846}
847
848impl<'a> Ticker<'a> {
849 /// Creates a ticker.
850 fn new(state: &'a State) -> Self {
851 Self { state, sleeping: 0 }
852 }
853
854 /// Moves the ticker into sleeping and unnotified state.
855 ///
856 /// Returns `false` if the ticker was already sleeping and unnotified.
857 fn sleep(&mut self, waker: &Waker) -> bool {
858 let mut sleepers = self
859 .state
860 .sleepers
861 .lock()
862 .unwrap_or_else(PoisonError::into_inner);
863
864 match self.sleeping {
865 // Move to sleeping state.
866 0 => {
867 self.sleeping = sleepers.insert(waker);
868 }
869
870 // Already sleeping, check if notified.
871 id => {
872 if !sleepers.update(id, waker) {
873 return false;
874 }
875 }
876 }
877
878 self.state
879 .notified
880 .store(sleepers.is_notified(), Ordering::Release);
881
882 true
883 }
884
885 /// Moves the ticker into woken state.
886 fn wake(&mut self) {
887 if self.sleeping != 0 {
888 let mut sleepers = self
889 .state
890 .sleepers
891 .lock()
892 .unwrap_or_else(PoisonError::into_inner);
893 sleepers.remove(self.sleeping);
894
895 self.state
896 .notified
897 .store(sleepers.is_notified(), Ordering::Release);
898 }
899 self.sleeping = 0;
900 }
901
902 /// Waits for the next runnable task to run.
903 async fn runnable(&mut self) -> Runnable {
904 self.runnable_with(|| self.state.queue.pop().ok()).await
905 }
906
907 /// Waits for the next runnable task to run, given a function that searches for a task.
908 async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
909 future::poll_fn(|cx| {
910 loop {
911 match search() {
912 None => {
913 // Move to sleeping and unnotified state.
914 if !self.sleep(cx.waker()) {
915 // If already sleeping and unnotified, return.
916 return Poll::Pending;
917 }
918 }
919 Some(r) => {
920 // Wake up.
921 self.wake();
922
923 // Notify another ticker now to pick up where this ticker left off, just in
924 // case running the task takes a long time.
925 self.state.notify();
926
927 return Poll::Ready(r);
928 }
929 }
930 }
931 })
932 .await
933 }
934}
935
936impl Drop for Ticker<'_> {
937 fn drop(&mut self) {
938 // If this ticker is in sleeping state, it must be removed from the sleepers list.
939 if self.sleeping != 0 {
940 let mut sleepers = self
941 .state
942 .sleepers
943 .lock()
944 .unwrap_or_else(PoisonError::into_inner);
945 let notified = sleepers.remove(self.sleeping);
946
947 self.state
948 .notified
949 .store(sleepers.is_notified(), Ordering::Release);
950
951 // If this ticker was notified, then notify another ticker.
952 if notified {
953 drop(sleepers);
954 self.state.notify();
955 }
956 }
957 }
958}
959
960/// A worker in a work-stealing executor.
961///
962/// This is just a ticker that also has an associated local queue for improved cache locality.
963struct Runner<'a> {
964 /// The executor state.
965 state: &'a State,
966
967 /// Inner ticker.
968 ticker: Ticker<'a>,
969
970 /// The local queue.
971 local: Arc<ConcurrentQueue<Runnable>>,
972
973 /// Bumped every time a runnable task is found.
974 ticks: usize,
975}
976
977impl<'a> Runner<'a> {
978 /// Creates a runner and registers it in the executor state.
979 fn new(state: &'a State) -> Self {
980 let runner = Self {
981 state,
982 ticker: Ticker::new(state),
983 local: Arc::new(ConcurrentQueue::bounded(512)),
984 ticks: 0,
985 };
986 state
987 .local_queues
988 .write()
989 .unwrap_or_else(PoisonError::into_inner)
990 .push(runner.local.clone());
991 runner
992 }
993
994 /// Waits for the next runnable task to run.
995 async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
996 let runnable = self
997 .ticker
998 .runnable_with(|| {
999 // Try the local queue.
1000 if let Ok(r) = self.local.pop() {
1001 return Some(r);
1002 }
1003
1004 // Try stealing from the global queue.
1005 if let Ok(r) = self.state.queue.pop() {
1006 steal(&self.state.queue, &self.local);
1007 return Some(r);
1008 }
1009
1010 // Try stealing from other runners.
1011 if let Ok(local_queues) = self.state.local_queues.try_read() {
1012 // Pick a random starting point in the iterator list and rotate the list.
1013 let n = local_queues.len();
1014 let start = rng.usize(..n);
1015 let iter = local_queues
1016 .iter()
1017 .chain(local_queues.iter())
1018 .skip(start)
1019 .take(n);
1020
1021 // Remove this runner's local queue.
1022 let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1023
1024 // Try stealing from each local queue in the list.
1025 for local in iter {
1026 steal(local, &self.local);
1027 if let Ok(r) = self.local.pop() {
1028 return Some(r);
1029 }
1030 }
1031 }
1032
1033 None
1034 })
1035 .await;
1036
1037 // Bump the tick counter.
1038 self.ticks = self.ticks.wrapping_add(1);
1039
1040 if self.ticks % 64 == 0 {
1041 // Steal tasks from the global queue to ensure fair task scheduling.
1042 steal(&self.state.queue, &self.local);
1043 }
1044
1045 runnable
1046 }
1047}
1048
1049impl Drop for Runner<'_> {
1050 fn drop(&mut self) {
1051 // Remove the local queue.
1052 self.state
1053 .local_queues
1054 .write()
1055 .unwrap_or_else(PoisonError::into_inner)
1056 .retain(|local| !Arc::ptr_eq(local, &self.local));
1057
1058 // Re-schedule remaining tasks in the local queue.
1059 while let Ok(r) = self.local.pop() {
1060 r.schedule();
1061 }
1062 }
1063}
1064
1065/// Steals some items from one queue into another.
1066fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
1067 // Half of `src`'s length rounded up.
1068 let mut count = (src.len() + 1) / 2;
1069
1070 if count > 0 {
1071 // Don't steal more than fits into the queue.
1072 if let Some(cap) = dest.capacity() {
1073 count = count.min(cap - dest.len());
1074 }
1075
1076 // Steal tasks.
1077 for _ in 0..count {
1078 if let Ok(t) = src.pop() {
1079 assert!(dest.push(t).is_ok());
1080 } else {
1081 break;
1082 }
1083 }
1084 }
1085}
1086
1087/// Debug implementation for `Executor` and `LocalExecutor`.
1088fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1089 // Get a reference to the state.
1090 let ptr = executor.state.load(Ordering::Acquire);
1091 if ptr.is_null() {
1092 // The executor has not been initialized.
1093 struct Uninitialized;
1094
1095 impl fmt::Debug for Uninitialized {
1096 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1097 f.write_str("<uninitialized>")
1098 }
1099 }
1100
1101 return f.debug_tuple(name).field(&Uninitialized).finish();
1102 }
1103
1104 // SAFETY: If the state pointer is not null, it must have been
1105 // allocated properly by Arc::new and converted via Arc::into_raw
1106 // in state_ptr.
1107 let state = unsafe { &*ptr };
1108
1109 debug_state(state, name, f)
1110}
1111
1112/// Debug implementation for `Executor` and `LocalExecutor`.
1113fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1114 /// Debug wrapper for the number of active tasks.
1115 struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
1116
1117 impl fmt::Debug for ActiveTasks<'_> {
1118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1119 match self.0.try_lock() {
1120 Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
1121 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1122 Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f),
1123 }
1124 }
1125 }
1126
1127 /// Debug wrapper for the local runners.
1128 struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
1129
1130 impl fmt::Debug for LocalRunners<'_> {
1131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132 match self.0.try_read() {
1133 Ok(lock) => f
1134 .debug_list()
1135 .entries(lock.iter().map(|queue| queue.len()))
1136 .finish(),
1137 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1138 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1139 }
1140 }
1141 }
1142
1143 /// Debug wrapper for the sleepers.
1144 struct SleepCount<'a>(&'a Mutex<Sleepers>);
1145
1146 impl fmt::Debug for SleepCount<'_> {
1147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1148 match self.0.try_lock() {
1149 Ok(lock) => fmt::Debug::fmt(&lock.count, f),
1150 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1151 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1152 }
1153 }
1154 }
1155
1156 f.debug_struct(name)
1157 .field("active", &ActiveTasks(&state.active))
1158 .field("global_tasks", &state.queue.len())
1159 .field("local_runners", &LocalRunners(&state.local_queues))
1160 .field("sleepers", &SleepCount(&state.sleepers))
1161 .finish()
1162}
1163
1164/// Runs a closure when dropped.
1165struct CallOnDrop<F: FnMut()>(F);
1166
1167impl<F: FnMut()> Drop for CallOnDrop<F> {
1168 fn drop(&mut self) {
1169 (self.0)();
1170 }
1171}
1172
1173pin_project! {
1174 /// A wrapper around a future, running a closure when dropped.
1175 struct AsyncCallOnDrop<Fut, Cleanup: FnMut()> {
1176 #[pin]
1177 future: Fut,
1178 cleanup: CallOnDrop<Cleanup>,
1179 }
1180}
1181
1182impl<Fut, Cleanup: FnMut()> AsyncCallOnDrop<Fut, Cleanup> {
1183 fn new(future: Fut, cleanup: Cleanup) -> Self {
1184 Self {
1185 future,
1186 cleanup: CallOnDrop(cleanup),
1187 }
1188 }
1189}
1190
1191impl<Fut: Future, Cleanup: FnMut()> Future for AsyncCallOnDrop<Fut, Cleanup> {
1192 type Output = Fut::Output;
1193
1194 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1195 self.project().future.poll(cx)
1196 }
1197}
1198
1199fn _ensure_send_and_sync() {
1200 use futures_lite::future::pending;
1201
1202 fn is_send<T: Send>(_: T) {}
1203 fn is_sync<T: Sync>(_: T) {}
1204 fn is_static<T: 'static>(_: T) {}
1205
1206 is_send::<Executor<'_>>(Executor::new());
1207 is_sync::<Executor<'_>>(Executor::new());
1208
1209 let ex = Executor::new();
1210 let state = ex.state();
1211 is_send(ex.run(pending::<()>()));
1212 is_sync(ex.run(pending::<()>()));
1213 is_send(ex.tick());
1214 is_sync(ex.tick());
1215 is_send(Executor::schedule(state));
1216 is_sync(Executor::schedule(state));
1217 is_static(Executor::schedule(state));
1218
1219 /// ```compile_fail
1220 /// use async_executor::LocalExecutor;
1221 /// use futures_lite::future::pending;
1222 ///
1223 /// fn is_send<T: Send>(_: T) {}
1224 /// fn is_sync<T: Sync>(_: T) {}
1225 ///
1226 /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
1227 /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
1228 ///
1229 /// let ex = LocalExecutor::new();
1230 /// is_send(ex.run(pending::<()>()));
1231 /// is_sync(ex.run(pending::<()>()));
1232 /// is_send(ex.tick());
1233 /// is_sync(ex.tick());
1234 /// ```
1235 fn _negative_test() {}
1236}