Skip to main content

async_executor/
lib.rs

1//! Async executors.
2//!
3//! This crate provides two reference executors that trade performance for
4//! functionality. They should be considered reference executors that are "good
5//! enough" for most use cases. For more specialized use cases, consider writing
6//! your own executor on top of [`async-task`].
7//!
8//! [`async-task`]: https://crates.io/crates/async-task
9//!
10//! # Examples
11//!
12//! ```
13//! use async_executor::Executor;
14//! use futures_lite::future;
15//!
16//! // Create a new executor.
17//! let ex = Executor::new();
18//!
19//! // Spawn a task.
20//! let task = ex.spawn(async {
21//!     println!("Hello world");
22//! });
23//!
24//! // Run the executor until the task completes.
25//! future::block_on(ex.run(task));
26//! ```
27
28#![warn(
29    missing_docs,
30    missing_debug_implementations,
31    rust_2018_idioms,
32    clippy::undocumented_unsafe_blocks
33)]
34#![doc(
35    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40#![cfg_attr(docsrs, feature(doc_cfg))]
41#![allow(clippy::unused_unit)] // false positive fixed in Rust 1.89
42
43extern crate alloc;
44
45use alloc::rc::Rc;
46use alloc::sync::Arc;
47use alloc::vec::Vec;
48use core::fmt;
49use core::marker::PhantomData;
50use core::panic::{RefUnwindSafe, UnwindSafe};
51use core::pin::Pin;
52use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
53use core::task::{Context, Poll, Waker};
54use std::sync::{Mutex, MutexGuard, PoisonError, RwLock, TryLockError};
55
56use async_task::{Builder, Runnable};
57use concurrent_queue::ConcurrentQueue;
58use futures_lite::{future, prelude::*};
59use pin_project_lite::pin_project;
60use slab::Slab;
61
62#[cfg(feature = "static")]
63mod static_executors;
64
65#[doc(no_inline)]
66pub use async_task::{FallibleTask, Task};
67#[cfg(feature = "static")]
68#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
69pub use static_executors::*;
70
71/// An async executor.
72///
73/// # Examples
74///
75/// A multi-threaded executor:
76///
77/// ```
78/// use async_channel::unbounded;
79/// use async_executor::Executor;
80/// use easy_parallel::Parallel;
81/// use futures_lite::future;
82///
83/// let ex = Executor::new();
84/// let (signal, shutdown) = unbounded::<()>();
85///
86/// Parallel::new()
87///     // Run four executor threads.
88///     .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
89///     // Run the main future on the current thread.
90///     .finish(|| future::block_on(async {
91///         println!("Hello world!");
92///         drop(signal);
93///     }));
94/// ```
95pub struct Executor<'a> {
96    /// The executor state.
97    state: AtomicPtr<State>,
98
99    /// Makes the `'a` lifetime invariant.
100    _marker: PhantomData<core::cell::UnsafeCell<&'a ()>>,
101}
102
103// SAFETY: Executor stores no thread local state that can be accessed via other thread.
104unsafe impl Send for Executor<'_> {}
105// SAFETY: Executor internally synchronizes all of it's operations internally.
106unsafe impl Sync for Executor<'_> {}
107
108impl UnwindSafe for Executor<'_> {}
109impl RefUnwindSafe for Executor<'_> {}
110
111impl fmt::Debug for Executor<'_> {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        debug_executor(self, "Executor", f)
114    }
115}
116
117impl<'a> Executor<'a> {
118    /// Creates a new executor.
119    ///
120    /// # Examples
121    ///
122    /// ```
123    /// use async_executor::Executor;
124    ///
125    /// let ex = Executor::new();
126    /// ```
127    pub const fn new() -> Self {
128        Self {
129            state: AtomicPtr::new(core::ptr::null_mut()),
130            _marker: PhantomData,
131        }
132    }
133
134    /// Returns `true` if there are no unfinished tasks.
135    ///
136    /// # Examples
137    ///
138    /// ```
139    /// use async_executor::Executor;
140    ///
141    /// let ex = Executor::new();
142    /// assert!(ex.is_empty());
143    ///
144    /// let task = ex.spawn(async {
145    ///     println!("Hello world");
146    /// });
147    /// assert!(!ex.is_empty());
148    ///
149    /// assert!(ex.try_tick());
150    /// assert!(ex.is_empty());
151    /// ```
152    pub fn is_empty(&self) -> bool {
153        self.state().active().is_empty()
154    }
155
156    /// Spawns a task onto the executor.
157    ///
158    /// # Examples
159    ///
160    /// ```
161    /// use async_executor::Executor;
162    ///
163    /// let ex = Executor::new();
164    ///
165    /// let task = ex.spawn(async {
166    ///     println!("Hello world");
167    /// });
168    /// ```
169    pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
170        let state = self.state();
171        let mut active = state.active();
172
173        // SAFETY: `T` and the future are `Send`.
174        unsafe { Self::spawn_inner(state, future, &mut active) }
175    }
176
177    /// Spawns many tasks onto the executor.
178    ///
179    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
180    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
181    /// contention.
182    ///
183    /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
184    /// prevent runner thread starvation. It is assumed that the iterator provided does not
185    /// block; blocking iterators can lock up the internal mutex and therefore the entire
186    /// executor.
187    ///
188    /// ## Example
189    ///
190    /// ```
191    /// use async_executor::Executor;
192    /// use futures_lite::{stream, prelude::*};
193    /// use core::future::ready;
194    ///
195    /// # futures_lite::future::block_on(async {
196    /// let mut ex = Executor::new();
197    ///
198    /// let futures = [
199    ///     ready(1),
200    ///     ready(2),
201    ///     ready(3)
202    /// ];
203    ///
204    /// // Spawn all of the futures onto the executor at once.
205    /// let mut tasks = vec![];
206    /// ex.spawn_many(futures, &mut tasks);
207    ///
208    /// // Await all of them.
209    /// let results = ex.run(async move {
210    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
211    /// }).await;
212    /// assert_eq!(results, [1, 2, 3]);
213    /// # });
214    /// ```
215    ///
216    /// [`spawn`]: Executor::spawn
217    pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
218        &self,
219        futures: impl IntoIterator<Item = F>,
220        handles: &mut impl Extend<Task<F::Output>>,
221    ) {
222        let state = self.state();
223        let mut active = Some(state.as_ref().active());
224
225        // Convert the futures into tasks.
226        let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
227            // SAFETY: `T` and the future are `Send`.
228            let task = unsafe { Self::spawn_inner(state, future, active.as_mut().unwrap()) };
229
230            // Yield the lock every once in a while to ease contention.
231            if i.wrapping_sub(1) % 500 == 0 {
232                drop(active.take());
233                active = Some(self.state().active());
234            }
235
236            task
237        });
238
239        // Push the tasks to the user's collection.
240        handles.extend(tasks);
241    }
242
243    /// Spawn a future while holding the inner lock.
244    ///
245    /// # Safety
246    ///
247    /// If this is an `Executor`, `F` and `T` must be `Send`.
248    unsafe fn spawn_inner<T: 'a>(
249        state: Pin<&'a State>,
250        future: impl Future<Output = T> + 'a,
251        active: &mut Slab<Waker>,
252    ) -> Task<T> {
253        // Remove the task from the set of active tasks when the future finishes.
254        let entry = active.vacant_entry();
255        let index = entry.key();
256        let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index)));
257
258        // Create the task and register it in the set of active tasks.
259        //
260        // SAFETY:
261        //
262        // If `future` is not `Send`, this must be a `LocalExecutor` as per this
263        // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
264        // `try_tick`, `tick` and `run` can only be called from the origin
265        // thread of the `LocalExecutor`. Similarly, `spawn` can only  be called
266        // from the origin thread, ensuring that `future` and the executor share
267        // the same origin thread. The `Runnable` can be scheduled from other
268        // threads, but because of the above `Runnable` can only be called or
269        // dropped on the origin thread.
270        //
271        // `future` is not `'static`, but we make sure that the `Runnable` does
272        // not outlive `'a`. When the executor is dropped, the `active` field is
273        // drained and all of the `Waker`s are woken. Then, the queue inside of
274        // the `Executor` is drained of all of its runnables. This ensures that
275        // runnables are dropped and this precondition is satisfied.
276        //
277        // `Self::schedule` is `Send` and `Sync`, as checked below.
278        // Therefore we do not need to worry about which thread the `Waker` is used
279        // and dropped on.
280        //
281        // `Self::schedule` may not be `'static`, but we make sure that the `Waker` does
282        // not outlive `'a`. When the executor is dropped, the `active` field is
283        // drained and all of the `Waker`s are woken.
284        let (runnable, task) = Builder::new()
285            .propagate_panic(true)
286            .spawn_unchecked(|()| future, Self::schedule(state));
287        entry.insert(runnable.waker());
288
289        runnable.schedule();
290        task
291    }
292
293    /// Attempts to run a task if at least one is scheduled.
294    ///
295    /// Running a scheduled task means simply polling its future once.
296    ///
297    /// # Examples
298    ///
299    /// ```
300    /// use async_executor::Executor;
301    ///
302    /// let ex = Executor::new();
303    /// assert!(!ex.try_tick()); // no tasks to run
304    ///
305    /// let task = ex.spawn(async {
306    ///     println!("Hello world");
307    /// });
308    /// assert!(ex.try_tick()); // a task was found
309    /// ```
310    pub fn try_tick(&self) -> bool {
311        self.state().try_tick()
312    }
313
314    /// Runs a single task.
315    ///
316    /// Running a task means simply polling its future once.
317    ///
318    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
319    ///
320    /// # Examples
321    ///
322    /// ```
323    /// use async_executor::Executor;
324    /// use futures_lite::future;
325    ///
326    /// let ex = Executor::new();
327    ///
328    /// let task = ex.spawn(async {
329    ///     println!("Hello world");
330    /// });
331    /// future::block_on(ex.tick()); // runs the task
332    /// ```
333    pub async fn tick(&self) {
334        self.state().tick().await;
335    }
336
337    /// Runs the executor until the given future completes.
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// use async_executor::Executor;
343    /// use futures_lite::future;
344    ///
345    /// let ex = Executor::new();
346    ///
347    /// let task = ex.spawn(async { 1 + 2 });
348    /// let res = future::block_on(ex.run(async { task.await * 2 }));
349    ///
350    /// assert_eq!(res, 6);
351    /// ```
352    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
353        self.state().run(future).await
354    }
355
356    /// Returns a function that schedules a runnable task when it gets woken up.
357    fn schedule(state: Pin<&'a State>) -> impl Fn(Runnable) + Send + Sync + 'a {
358        // TODO: If possible, push into the current local queue and notify the ticker.
359        move |runnable| {
360            let result = state.queue.push(runnable);
361            debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
362            state.notify();
363        }
364    }
365
366    /// Returns a pointer to the inner state.
367    #[inline]
368    fn state(&self) -> Pin<&'a State> {
369        #[cold]
370        fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
371            let state = Arc::new(State::new());
372            let ptr = Arc::into_raw(state).cast_mut();
373            if let Err(actual) = atomic_ptr.compare_exchange(
374                core::ptr::null_mut(),
375                ptr,
376                Ordering::AcqRel,
377                Ordering::Acquire,
378            ) {
379                // SAFETY: This was just created from Arc::into_raw.
380                drop(unsafe { Arc::from_raw(ptr) });
381                actual
382            } else {
383                ptr
384            }
385        }
386
387        let mut ptr = self.state.load(Ordering::Acquire);
388        if ptr.is_null() {
389            ptr = alloc_state(&self.state);
390        }
391
392        // SAFETY: So long as an Executor lives, it's state pointer will always be valid
393        // and will never be moved until it's dropped.
394        Pin::new(unsafe { &*ptr })
395    }
396}
397
398impl Drop for Executor<'_> {
399    fn drop(&mut self) {
400        let ptr = *self.state.get_mut();
401        if ptr.is_null() {
402            return;
403        }
404
405        // SAFETY: As ptr is not null, it was allocated via Arc::new and converted
406        // via Arc::into_raw in state_ptr.
407        let state = unsafe { Arc::from_raw(ptr) };
408
409        let mut active = state.pin().active();
410        for w in active.drain() {
411            w.wake();
412        }
413        drop(active);
414
415        while state.queue.pop().is_ok() {}
416    }
417}
418
419impl<'a> Default for Executor<'a> {
420    fn default() -> Self {
421        Self::new()
422    }
423}
424
425/// A thread-local executor.
426///
427/// The executor can only be run on the thread that created it.
428///
429/// # Examples
430///
431/// ```
432/// use async_executor::LocalExecutor;
433/// use futures_lite::future;
434///
435/// let local_ex = LocalExecutor::new();
436///
437/// future::block_on(local_ex.run(async {
438///     println!("Hello world!");
439/// }));
440/// ```
441pub struct LocalExecutor<'a> {
442    /// The inner executor.
443    inner: Executor<'a>,
444
445    /// Makes the type `!Send` and `!Sync`.
446    _marker: PhantomData<Rc<()>>,
447}
448
449impl UnwindSafe for LocalExecutor<'_> {}
450impl RefUnwindSafe for LocalExecutor<'_> {}
451
452impl fmt::Debug for LocalExecutor<'_> {
453    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
454        debug_executor(&self.inner, "LocalExecutor", f)
455    }
456}
457
458impl<'a> LocalExecutor<'a> {
459    /// Creates a single-threaded executor.
460    ///
461    /// # Examples
462    ///
463    /// ```
464    /// use async_executor::LocalExecutor;
465    ///
466    /// let local_ex = LocalExecutor::new();
467    /// ```
468    pub const fn new() -> Self {
469        Self {
470            inner: Executor::new(),
471            _marker: PhantomData,
472        }
473    }
474
475    /// Returns `true` if there are no unfinished tasks.
476    ///
477    /// # Examples
478    ///
479    /// ```
480    /// use async_executor::LocalExecutor;
481    ///
482    /// let local_ex = LocalExecutor::new();
483    /// assert!(local_ex.is_empty());
484    ///
485    /// let task = local_ex.spawn(async {
486    ///     println!("Hello world");
487    /// });
488    /// assert!(!local_ex.is_empty());
489    ///
490    /// assert!(local_ex.try_tick());
491    /// assert!(local_ex.is_empty());
492    /// ```
493    pub fn is_empty(&self) -> bool {
494        self.inner().is_empty()
495    }
496
497    /// Spawns a task onto the executor.
498    ///
499    /// # Examples
500    ///
501    /// ```
502    /// use async_executor::LocalExecutor;
503    ///
504    /// let local_ex = LocalExecutor::new();
505    ///
506    /// let task = local_ex.spawn(async {
507    ///     println!("Hello world");
508    /// });
509    /// ```
510    pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
511        let state = self.inner().state();
512        let mut active = state.active();
513
514        // SAFETY: This executor is not thread safe, so the future and its result
515        //         cannot be sent to another thread.
516        unsafe { Executor::spawn_inner(state, future, &mut active) }
517    }
518
519    /// Spawns many tasks onto the executor.
520    ///
521    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
522    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
523    /// contention.
524    ///
525    /// It is assumed that the iterator provided does not block; blocking iterators can lock up
526    /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
527    /// mutex is not released, as there are no other threads that can poll this executor.
528    ///
529    /// ## Example
530    ///
531    /// ```
532    /// use async_executor::LocalExecutor;
533    /// use futures_lite::{stream, prelude::*};
534    /// use core::future::ready;
535    ///
536    /// # futures_lite::future::block_on(async {
537    /// let mut ex = LocalExecutor::new();
538    ///
539    /// let futures = [
540    ///     ready(1),
541    ///     ready(2),
542    ///     ready(3)
543    /// ];
544    ///
545    /// // Spawn all of the futures onto the executor at once.
546    /// let mut tasks = vec![];
547    /// ex.spawn_many(futures, &mut tasks);
548    ///
549    /// // Await all of them.
550    /// let results = ex.run(async move {
551    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
552    /// }).await;
553    /// assert_eq!(results, [1, 2, 3]);
554    /// # });
555    /// ```
556    ///
557    /// [`spawn`]: LocalExecutor::spawn
558    pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
559        &self,
560        futures: impl IntoIterator<Item = F>,
561        handles: &mut impl Extend<Task<F::Output>>,
562    ) {
563        let state = self.inner().state();
564        let mut active = state.active();
565
566        // Convert all of the futures to tasks.
567        let tasks = futures.into_iter().map(|future| {
568            // SAFETY: This executor is not thread safe, so the future and its result
569            //         cannot be sent to another thread.
570            unsafe { Executor::spawn_inner(state, future, &mut active) }
571
572            // As only one thread can spawn or poll tasks at a time, there is no need
573            // to release lock contention here.
574        });
575
576        // Push them to the user's collection.
577        handles.extend(tasks);
578    }
579
580    /// Attempts to run a task if at least one is scheduled.
581    ///
582    /// Running a scheduled task means simply polling its future once.
583    ///
584    /// # Examples
585    ///
586    /// ```
587    /// use async_executor::LocalExecutor;
588    ///
589    /// let ex = LocalExecutor::new();
590    /// assert!(!ex.try_tick()); // no tasks to run
591    ///
592    /// let task = ex.spawn(async {
593    ///     println!("Hello world");
594    /// });
595    /// assert!(ex.try_tick()); // a task was found
596    /// ```
597    pub fn try_tick(&self) -> bool {
598        self.inner().try_tick()
599    }
600
601    /// Runs a single task.
602    ///
603    /// Running a task means simply polling its future once.
604    ///
605    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
606    ///
607    /// # Examples
608    ///
609    /// ```
610    /// use async_executor::LocalExecutor;
611    /// use futures_lite::future;
612    ///
613    /// let ex = LocalExecutor::new();
614    ///
615    /// let task = ex.spawn(async {
616    ///     println!("Hello world");
617    /// });
618    /// future::block_on(ex.tick()); // runs the task
619    /// ```
620    pub async fn tick(&self) {
621        self.inner().tick().await
622    }
623
624    /// Runs the executor until the given future completes.
625    ///
626    /// # Examples
627    ///
628    /// ```
629    /// use async_executor::LocalExecutor;
630    /// use futures_lite::future;
631    ///
632    /// let local_ex = LocalExecutor::new();
633    ///
634    /// let task = local_ex.spawn(async { 1 + 2 });
635    /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
636    ///
637    /// assert_eq!(res, 6);
638    /// ```
639    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
640        self.inner().run(future).await
641    }
642
643    /// Returns a reference to the inner executor.
644    fn inner(&self) -> &Executor<'a> {
645        &self.inner
646    }
647}
648
649impl<'a> Default for LocalExecutor<'a> {
650    fn default() -> Self {
651        Self::new()
652    }
653}
654
655/// The state of a executor.
656struct State {
657    /// The global queue.
658    queue: ConcurrentQueue<Runnable>,
659
660    /// Local queues created by runners.
661    local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
662
663    /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
664    notified: AtomicBool,
665
666    /// A list of sleeping tickers.
667    sleepers: Mutex<Sleepers>,
668
669    /// Currently active tasks.
670    active: Mutex<Slab<Waker>>,
671}
672
673impl State {
674    /// Creates state for a new executor.
675    const fn new() -> Self {
676        Self {
677            queue: ConcurrentQueue::unbounded(),
678            local_queues: RwLock::new(Vec::new()),
679            notified: AtomicBool::new(true),
680            sleepers: Mutex::new(Sleepers {
681                count: 0,
682                wakers: Vec::new(),
683                free_ids: Vec::new(),
684            }),
685            active: Mutex::new(Slab::new()),
686        }
687    }
688
689    fn pin(&self) -> Pin<&Self> {
690        Pin::new(self)
691    }
692
693    /// Returns a reference to currently active tasks.
694    fn active(self: Pin<&Self>) -> MutexGuard<'_, Slab<Waker>> {
695        self.get_ref()
696            .active
697            .lock()
698            .unwrap_or_else(PoisonError::into_inner)
699    }
700
701    /// Notifies a sleeping ticker.
702    #[inline]
703    fn notify(&self) {
704        if self
705            .notified
706            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
707            .is_ok()
708        {
709            let waker = self
710                .sleepers
711                .lock()
712                .unwrap_or_else(PoisonError::into_inner)
713                .notify();
714            if let Some(w) = waker {
715                w.wake();
716            }
717        }
718    }
719
720    pub(crate) fn try_tick(&self) -> bool {
721        match self.queue.pop() {
722            Err(_) => false,
723            Ok(runnable) => {
724                // Notify another ticker now to pick up where this ticker left off, just in case
725                // running the task takes a long time.
726                self.notify();
727
728                // Run the task.
729                runnable.run();
730                true
731            }
732        }
733    }
734
735    pub(crate) async fn tick(&self) {
736        let runnable = Ticker::new(self).runnable().await;
737        runnable.run();
738    }
739
740    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
741        let mut runner = Runner::new(self);
742        let mut rng = fastrand::Rng::new();
743
744        // A future that runs tasks forever.
745        let run_forever = async {
746            loop {
747                for _ in 0..200 {
748                    let runnable = runner.runnable(&mut rng).await;
749                    runnable.run();
750                }
751                future::yield_now().await;
752            }
753        };
754
755        // Run `future` and `run_forever` concurrently until `future` completes.
756        future.or(run_forever).await
757    }
758}
759
760/// A list of sleeping tickers.
761struct Sleepers {
762    /// Number of sleeping tickers (both notified and unnotified).
763    count: usize,
764
765    /// IDs and wakers of sleeping unnotified tickers.
766    ///
767    /// A sleeping ticker is notified when its waker is missing from this list.
768    wakers: Vec<(usize, Waker)>,
769
770    /// Reclaimed IDs.
771    free_ids: Vec<usize>,
772}
773
774impl Sleepers {
775    /// Inserts a new sleeping ticker.
776    fn insert(&mut self, waker: &Waker) -> usize {
777        let id = match self.free_ids.pop() {
778            Some(id) => id,
779            None => self.count + 1,
780        };
781        self.count += 1;
782        self.wakers.push((id, waker.clone()));
783        id
784    }
785
786    /// Re-inserts a sleeping ticker's waker if it was notified.
787    ///
788    /// Returns `true` if the ticker was notified.
789    fn update(&mut self, id: usize, waker: &Waker) -> bool {
790        for item in &mut self.wakers {
791            if item.0 == id {
792                item.1.clone_from(waker);
793                return false;
794            }
795        }
796
797        self.wakers.push((id, waker.clone()));
798        true
799    }
800
801    /// Removes a previously inserted sleeping ticker.
802    ///
803    /// Returns `true` if the ticker was notified.
804    fn remove(&mut self, id: usize) -> bool {
805        self.count -= 1;
806        self.free_ids.push(id);
807
808        for i in (0..self.wakers.len()).rev() {
809            if self.wakers[i].0 == id {
810                self.wakers.remove(i);
811                return false;
812            }
813        }
814        true
815    }
816
817    /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
818    fn is_notified(&self) -> bool {
819        self.count == 0 || self.count > self.wakers.len()
820    }
821
822    /// Returns notification waker for a sleeping ticker.
823    ///
824    /// If a ticker was notified already or there are no tickers, `None` will be returned.
825    fn notify(&mut self) -> Option<Waker> {
826        if self.wakers.len() == self.count {
827            self.wakers.pop().map(|item| item.1)
828        } else {
829            None
830        }
831    }
832}
833
834/// Runs task one by one.
835struct Ticker<'a> {
836    /// The executor state.
837    state: &'a State,
838
839    /// Set to a non-zero sleeper ID when in sleeping state.
840    ///
841    /// States a ticker can be in:
842    /// 1) Woken.
843    ///    2a) Sleeping and unnotified.
844    ///    2b) Sleeping and notified.
845    sleeping: usize,
846}
847
848impl<'a> Ticker<'a> {
849    /// Creates a ticker.
850    fn new(state: &'a State) -> Self {
851        Self { state, sleeping: 0 }
852    }
853
854    /// Moves the ticker into sleeping and unnotified state.
855    ///
856    /// Returns `false` if the ticker was already sleeping and unnotified.
857    fn sleep(&mut self, waker: &Waker) -> bool {
858        let mut sleepers = self
859            .state
860            .sleepers
861            .lock()
862            .unwrap_or_else(PoisonError::into_inner);
863
864        match self.sleeping {
865            // Move to sleeping state.
866            0 => {
867                self.sleeping = sleepers.insert(waker);
868            }
869
870            // Already sleeping, check if notified.
871            id => {
872                if !sleepers.update(id, waker) {
873                    return false;
874                }
875            }
876        }
877
878        self.state
879            .notified
880            .store(sleepers.is_notified(), Ordering::Release);
881
882        true
883    }
884
885    /// Moves the ticker into woken state.
886    fn wake(&mut self) {
887        if self.sleeping != 0 {
888            let mut sleepers = self
889                .state
890                .sleepers
891                .lock()
892                .unwrap_or_else(PoisonError::into_inner);
893            sleepers.remove(self.sleeping);
894
895            self.state
896                .notified
897                .store(sleepers.is_notified(), Ordering::Release);
898        }
899        self.sleeping = 0;
900    }
901
902    /// Waits for the next runnable task to run.
903    async fn runnable(&mut self) -> Runnable {
904        self.runnable_with(|| self.state.queue.pop().ok()).await
905    }
906
907    /// Waits for the next runnable task to run, given a function that searches for a task.
908    async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
909        future::poll_fn(|cx| {
910            loop {
911                match search() {
912                    None => {
913                        // Move to sleeping and unnotified state.
914                        if !self.sleep(cx.waker()) {
915                            // If already sleeping and unnotified, return.
916                            return Poll::Pending;
917                        }
918                    }
919                    Some(r) => {
920                        // Wake up.
921                        self.wake();
922
923                        // Notify another ticker now to pick up where this ticker left off, just in
924                        // case running the task takes a long time.
925                        self.state.notify();
926
927                        return Poll::Ready(r);
928                    }
929                }
930            }
931        })
932        .await
933    }
934}
935
936impl Drop for Ticker<'_> {
937    fn drop(&mut self) {
938        // If this ticker is in sleeping state, it must be removed from the sleepers list.
939        if self.sleeping != 0 {
940            let mut sleepers = self
941                .state
942                .sleepers
943                .lock()
944                .unwrap_or_else(PoisonError::into_inner);
945            let notified = sleepers.remove(self.sleeping);
946
947            self.state
948                .notified
949                .store(sleepers.is_notified(), Ordering::Release);
950
951            // If this ticker was notified, then notify another ticker.
952            if notified {
953                drop(sleepers);
954                self.state.notify();
955            }
956        }
957    }
958}
959
960/// A worker in a work-stealing executor.
961///
962/// This is just a ticker that also has an associated local queue for improved cache locality.
963struct Runner<'a> {
964    /// The executor state.
965    state: &'a State,
966
967    /// Inner ticker.
968    ticker: Ticker<'a>,
969
970    /// The local queue.
971    local: Arc<ConcurrentQueue<Runnable>>,
972
973    /// Bumped every time a runnable task is found.
974    ticks: usize,
975}
976
977impl<'a> Runner<'a> {
978    /// Creates a runner and registers it in the executor state.
979    fn new(state: &'a State) -> Self {
980        let runner = Self {
981            state,
982            ticker: Ticker::new(state),
983            local: Arc::new(ConcurrentQueue::bounded(512)),
984            ticks: 0,
985        };
986        state
987            .local_queues
988            .write()
989            .unwrap_or_else(PoisonError::into_inner)
990            .push(runner.local.clone());
991        runner
992    }
993
994    /// Waits for the next runnable task to run.
995    async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
996        let runnable = self
997            .ticker
998            .runnable_with(|| {
999                // Try the local queue.
1000                if let Ok(r) = self.local.pop() {
1001                    return Some(r);
1002                }
1003
1004                // Try stealing from the global queue.
1005                if let Ok(r) = self.state.queue.pop() {
1006                    steal(&self.state.queue, &self.local);
1007                    return Some(r);
1008                }
1009
1010                // Try stealing from other runners.
1011                if let Ok(local_queues) = self.state.local_queues.try_read() {
1012                    // Pick a random starting point in the iterator list and rotate the list.
1013                    let n = local_queues.len();
1014                    let start = rng.usize(..n);
1015                    let iter = local_queues
1016                        .iter()
1017                        .chain(local_queues.iter())
1018                        .skip(start)
1019                        .take(n);
1020
1021                    // Remove this runner's local queue.
1022                    let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1023
1024                    // Try stealing from each local queue in the list.
1025                    for local in iter {
1026                        steal(local, &self.local);
1027                        if let Ok(r) = self.local.pop() {
1028                            return Some(r);
1029                        }
1030                    }
1031                }
1032
1033                None
1034            })
1035            .await;
1036
1037        // Bump the tick counter.
1038        self.ticks = self.ticks.wrapping_add(1);
1039
1040        if self.ticks % 64 == 0 {
1041            // Steal tasks from the global queue to ensure fair task scheduling.
1042            steal(&self.state.queue, &self.local);
1043        }
1044
1045        runnable
1046    }
1047}
1048
1049impl Drop for Runner<'_> {
1050    fn drop(&mut self) {
1051        // Remove the local queue.
1052        self.state
1053            .local_queues
1054            .write()
1055            .unwrap_or_else(PoisonError::into_inner)
1056            .retain(|local| !Arc::ptr_eq(local, &self.local));
1057
1058        // Re-schedule remaining tasks in the local queue.
1059        while let Ok(r) = self.local.pop() {
1060            r.schedule();
1061        }
1062    }
1063}
1064
1065/// Steals some items from one queue into another.
1066fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
1067    // Half of `src`'s length rounded up.
1068    let mut count = (src.len() + 1) / 2;
1069
1070    if count > 0 {
1071        // Don't steal more than fits into the queue.
1072        if let Some(cap) = dest.capacity() {
1073            count = count.min(cap - dest.len());
1074        }
1075
1076        // Steal tasks.
1077        for _ in 0..count {
1078            if let Ok(t) = src.pop() {
1079                assert!(dest.push(t).is_ok());
1080            } else {
1081                break;
1082            }
1083        }
1084    }
1085}
1086
1087/// Debug implementation for `Executor` and `LocalExecutor`.
1088fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1089    // Get a reference to the state.
1090    let ptr = executor.state.load(Ordering::Acquire);
1091    if ptr.is_null() {
1092        // The executor has not been initialized.
1093        struct Uninitialized;
1094
1095        impl fmt::Debug for Uninitialized {
1096            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1097                f.write_str("<uninitialized>")
1098            }
1099        }
1100
1101        return f.debug_tuple(name).field(&Uninitialized).finish();
1102    }
1103
1104    // SAFETY: If the state pointer is not null, it must have been
1105    // allocated properly by Arc::new and converted via Arc::into_raw
1106    // in state_ptr.
1107    let state = unsafe { &*ptr };
1108
1109    debug_state(state, name, f)
1110}
1111
1112/// Debug implementation for `Executor` and `LocalExecutor`.
1113fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1114    /// Debug wrapper for the number of active tasks.
1115    struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
1116
1117    impl fmt::Debug for ActiveTasks<'_> {
1118        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1119            match self.0.try_lock() {
1120                Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
1121                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1122                Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f),
1123            }
1124        }
1125    }
1126
1127    /// Debug wrapper for the local runners.
1128    struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
1129
1130    impl fmt::Debug for LocalRunners<'_> {
1131        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132            match self.0.try_read() {
1133                Ok(lock) => f
1134                    .debug_list()
1135                    .entries(lock.iter().map(|queue| queue.len()))
1136                    .finish(),
1137                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1138                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1139            }
1140        }
1141    }
1142
1143    /// Debug wrapper for the sleepers.
1144    struct SleepCount<'a>(&'a Mutex<Sleepers>);
1145
1146    impl fmt::Debug for SleepCount<'_> {
1147        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1148            match self.0.try_lock() {
1149                Ok(lock) => fmt::Debug::fmt(&lock.count, f),
1150                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1151                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1152            }
1153        }
1154    }
1155
1156    f.debug_struct(name)
1157        .field("active", &ActiveTasks(&state.active))
1158        .field("global_tasks", &state.queue.len())
1159        .field("local_runners", &LocalRunners(&state.local_queues))
1160        .field("sleepers", &SleepCount(&state.sleepers))
1161        .finish()
1162}
1163
1164/// Runs a closure when dropped.
1165struct CallOnDrop<F: FnMut()>(F);
1166
1167impl<F: FnMut()> Drop for CallOnDrop<F> {
1168    fn drop(&mut self) {
1169        (self.0)();
1170    }
1171}
1172
1173pin_project! {
1174    /// A wrapper around a future, running a closure when dropped.
1175    struct AsyncCallOnDrop<Fut, Cleanup: FnMut()> {
1176        #[pin]
1177        future: Fut,
1178        cleanup: CallOnDrop<Cleanup>,
1179    }
1180}
1181
1182impl<Fut, Cleanup: FnMut()> AsyncCallOnDrop<Fut, Cleanup> {
1183    fn new(future: Fut, cleanup: Cleanup) -> Self {
1184        Self {
1185            future,
1186            cleanup: CallOnDrop(cleanup),
1187        }
1188    }
1189}
1190
1191impl<Fut: Future, Cleanup: FnMut()> Future for AsyncCallOnDrop<Fut, Cleanup> {
1192    type Output = Fut::Output;
1193
1194    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1195        self.project().future.poll(cx)
1196    }
1197}
1198
1199fn _ensure_send_and_sync() {
1200    use futures_lite::future::pending;
1201
1202    fn is_send<T: Send>(_: T) {}
1203    fn is_sync<T: Sync>(_: T) {}
1204    fn is_static<T: 'static>(_: T) {}
1205
1206    is_send::<Executor<'_>>(Executor::new());
1207    is_sync::<Executor<'_>>(Executor::new());
1208
1209    let ex = Executor::new();
1210    let state = ex.state();
1211    is_send(ex.run(pending::<()>()));
1212    is_sync(ex.run(pending::<()>()));
1213    is_send(ex.tick());
1214    is_sync(ex.tick());
1215    is_send(Executor::schedule(state));
1216    is_sync(Executor::schedule(state));
1217    is_static(Executor::schedule(state));
1218
1219    /// ```compile_fail
1220    /// use async_executor::LocalExecutor;
1221    /// use futures_lite::future::pending;
1222    ///
1223    /// fn is_send<T: Send>(_: T) {}
1224    /// fn is_sync<T: Sync>(_: T) {}
1225    ///
1226    /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
1227    /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
1228    ///
1229    /// let ex = LocalExecutor::new();
1230    /// is_send(ex.run(pending::<()>()));
1231    /// is_sync(ex.run(pending::<()>()));
1232    /// is_send(ex.tick());
1233    /// is_sync(ex.tick());
1234    /// ```
1235    fn _negative_test() {}
1236}