async_executor/
lib.rs

1//! Async executors.
2//!
3//! This crate provides two reference executors that trade performance for
4//! functionality. They should be considered reference executors that are "good
5//! enough" for most use cases. For more specialized use cases, consider writing
6//! your own executor on top of [`async-task`].
7//!
8//! [`async-task`]: https://crates.io/crates/async-task
9//!
10//! # Examples
11//!
12//! ```
13//! use async_executor::Executor;
14//! use futures_lite::future;
15//!
16//! // Create a new executor.
17//! let ex = Executor::new();
18//!
19//! // Spawn a task.
20//! let task = ex.spawn(async {
21//!     println!("Hello world");
22//! });
23//!
24//! // Run the executor until the task completes.
25//! future::block_on(ex.run(task));
26//! ```
27
28#![warn(
29    missing_docs,
30    missing_debug_implementations,
31    rust_2018_idioms,
32    clippy::undocumented_unsafe_blocks
33)]
34#![doc(
35    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
41#![allow(clippy::unused_unit)] // false positive fixed in Rust 1.89
42
43use std::fmt;
44use std::marker::PhantomData;
45use std::panic::{RefUnwindSafe, UnwindSafe};
46use std::pin::Pin;
47use std::rc::Rc;
48use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
49use std::sync::{Arc, Mutex, MutexGuard, PoisonError, RwLock, TryLockError};
50use std::task::{Context, Poll, Waker};
51
52use async_task::{Builder, Runnable};
53use concurrent_queue::ConcurrentQueue;
54use futures_lite::{future, prelude::*};
55use pin_project_lite::pin_project;
56use slab::Slab;
57
58#[cfg(feature = "static")]
59mod static_executors;
60
61#[doc(no_inline)]
62pub use async_task::{FallibleTask, Task};
63#[cfg(feature = "static")]
64#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
65pub use static_executors::*;
66
67/// An async executor.
68///
69/// # Examples
70///
71/// A multi-threaded executor:
72///
73/// ```
74/// use async_channel::unbounded;
75/// use async_executor::Executor;
76/// use easy_parallel::Parallel;
77/// use futures_lite::future;
78///
79/// let ex = Executor::new();
80/// let (signal, shutdown) = unbounded::<()>();
81///
82/// Parallel::new()
83///     // Run four executor threads.
84///     .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
85///     // Run the main future on the current thread.
86///     .finish(|| future::block_on(async {
87///         println!("Hello world!");
88///         drop(signal);
89///     }));
90/// ```
91pub struct Executor<'a> {
92    /// The executor state.
93    state: AtomicPtr<State>,
94
95    /// Makes the `'a` lifetime invariant.
96    _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
97}
98
99// SAFETY: Executor stores no thread local state that can be accessed via other thread.
100unsafe impl Send for Executor<'_> {}
101// SAFETY: Executor internally synchronizes all of it's operations internally.
102unsafe impl Sync for Executor<'_> {}
103
104impl UnwindSafe for Executor<'_> {}
105impl RefUnwindSafe for Executor<'_> {}
106
107impl fmt::Debug for Executor<'_> {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        debug_executor(self, "Executor", f)
110    }
111}
112
113impl<'a> Executor<'a> {
114    /// Creates a new executor.
115    ///
116    /// # Examples
117    ///
118    /// ```
119    /// use async_executor::Executor;
120    ///
121    /// let ex = Executor::new();
122    /// ```
123    pub const fn new() -> Executor<'a> {
124        Executor {
125            state: AtomicPtr::new(std::ptr::null_mut()),
126            _marker: PhantomData,
127        }
128    }
129
130    /// Returns `true` if there are no unfinished tasks.
131    ///
132    /// # Examples
133    ///
134    /// ```
135    /// use async_executor::Executor;
136    ///
137    /// let ex = Executor::new();
138    /// assert!(ex.is_empty());
139    ///
140    /// let task = ex.spawn(async {
141    ///     println!("Hello world");
142    /// });
143    /// assert!(!ex.is_empty());
144    ///
145    /// assert!(ex.try_tick());
146    /// assert!(ex.is_empty());
147    /// ```
148    pub fn is_empty(&self) -> bool {
149        self.state().active().is_empty()
150    }
151
152    /// Spawns a task onto the executor.
153    ///
154    /// # Examples
155    ///
156    /// ```
157    /// use async_executor::Executor;
158    ///
159    /// let ex = Executor::new();
160    ///
161    /// let task = ex.spawn(async {
162    ///     println!("Hello world");
163    /// });
164    /// ```
165    pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
166        let mut active = self.state().active();
167
168        // SAFETY: `T` and the future are `Send`.
169        unsafe { self.spawn_inner(future, &mut active) }
170    }
171
172    /// Spawns many tasks onto the executor.
173    ///
174    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
175    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
176    /// contention.
177    ///
178    /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
179    /// prevent runner thread starvation. It is assumed that the iterator provided does not
180    /// block; blocking iterators can lock up the internal mutex and therefore the entire
181    /// executor.
182    ///
183    /// ## Example
184    ///
185    /// ```
186    /// use async_executor::Executor;
187    /// use futures_lite::{stream, prelude::*};
188    /// use std::future::ready;
189    ///
190    /// # futures_lite::future::block_on(async {
191    /// let mut ex = Executor::new();
192    ///
193    /// let futures = [
194    ///     ready(1),
195    ///     ready(2),
196    ///     ready(3)
197    /// ];
198    ///
199    /// // Spawn all of the futures onto the executor at once.
200    /// let mut tasks = vec![];
201    /// ex.spawn_many(futures, &mut tasks);
202    ///
203    /// // Await all of them.
204    /// let results = ex.run(async move {
205    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
206    /// }).await;
207    /// assert_eq!(results, [1, 2, 3]);
208    /// # });
209    /// ```
210    ///
211    /// [`spawn`]: Executor::spawn
212    pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
213        &self,
214        futures: impl IntoIterator<Item = F>,
215        handles: &mut impl Extend<Task<F::Output>>,
216    ) {
217        let mut active = Some(self.state().active());
218
219        // Convert the futures into tasks.
220        let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
221            // SAFETY: `T` and the future are `Send`.
222            let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
223
224            // Yield the lock every once in a while to ease contention.
225            if i.wrapping_sub(1) % 500 == 0 {
226                drop(active.take());
227                active = Some(self.state().active());
228            }
229
230            task
231        });
232
233        // Push the tasks to the user's collection.
234        handles.extend(tasks);
235    }
236
237    /// Spawn a future while holding the inner lock.
238    ///
239    /// # Safety
240    ///
241    /// If this is an `Executor`, `F` and `T` must be `Send`.
242    unsafe fn spawn_inner<T: 'a>(
243        &self,
244        future: impl Future<Output = T> + 'a,
245        active: &mut Slab<Waker>,
246    ) -> Task<T> {
247        // Remove the task from the set of active tasks when the future finishes.
248        let entry = active.vacant_entry();
249        let index = entry.key();
250        let state = self.state_as_arc();
251        let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index)));
252
253        // Create the task and register it in the set of active tasks.
254        //
255        // SAFETY:
256        //
257        // If `future` is not `Send`, this must be a `LocalExecutor` as per this
258        // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
259        // `try_tick`, `tick` and `run` can only be called from the origin
260        // thread of the `LocalExecutor`. Similarly, `spawn` can only  be called
261        // from the origin thread, ensuring that `future` and the executor share
262        // the same origin thread. The `Runnable` can be scheduled from other
263        // threads, but because of the above `Runnable` can only be called or
264        // dropped on the origin thread.
265        //
266        // `future` is not `'static`, but we make sure that the `Runnable` does
267        // not outlive `'a`. When the executor is dropped, the `active` field is
268        // drained and all of the `Waker`s are woken. Then, the queue inside of
269        // the `Executor` is drained of all of its runnables. This ensures that
270        // runnables are dropped and this precondition is satisfied.
271        //
272        // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
273        // Therefore we do not need to worry about what is done with the
274        // `Waker`.
275        let (runnable, task) = Builder::new()
276            .propagate_panic(true)
277            .spawn_unchecked(|()| future, self.schedule());
278        entry.insert(runnable.waker());
279
280        runnable.schedule();
281        task
282    }
283
284    /// Attempts to run a task if at least one is scheduled.
285    ///
286    /// Running a scheduled task means simply polling its future once.
287    ///
288    /// # Examples
289    ///
290    /// ```
291    /// use async_executor::Executor;
292    ///
293    /// let ex = Executor::new();
294    /// assert!(!ex.try_tick()); // no tasks to run
295    ///
296    /// let task = ex.spawn(async {
297    ///     println!("Hello world");
298    /// });
299    /// assert!(ex.try_tick()); // a task was found
300    /// ```
301    pub fn try_tick(&self) -> bool {
302        self.state().try_tick()
303    }
304
305    /// Runs a single task.
306    ///
307    /// Running a task means simply polling its future once.
308    ///
309    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
310    ///
311    /// # Examples
312    ///
313    /// ```
314    /// use async_executor::Executor;
315    /// use futures_lite::future;
316    ///
317    /// let ex = Executor::new();
318    ///
319    /// let task = ex.spawn(async {
320    ///     println!("Hello world");
321    /// });
322    /// future::block_on(ex.tick()); // runs the task
323    /// ```
324    pub async fn tick(&self) {
325        self.state().tick().await;
326    }
327
328    /// Runs the executor until the given future completes.
329    ///
330    /// # Examples
331    ///
332    /// ```
333    /// use async_executor::Executor;
334    /// use futures_lite::future;
335    ///
336    /// let ex = Executor::new();
337    ///
338    /// let task = ex.spawn(async { 1 + 2 });
339    /// let res = future::block_on(ex.run(async { task.await * 2 }));
340    ///
341    /// assert_eq!(res, 6);
342    /// ```
343    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
344        self.state().run(future).await
345    }
346
347    /// Returns a function that schedules a runnable task when it gets woken up.
348    fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
349        let state = self.state_as_arc();
350
351        // TODO: If possible, push into the current local queue and notify the ticker.
352        move |runnable| {
353            let result = state.queue.push(runnable);
354            debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
355            state.notify();
356        }
357    }
358
359    /// Returns a pointer to the inner state.
360    #[inline]
361    fn state_ptr(&self) -> *const State {
362        #[cold]
363        fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
364            let state = Arc::new(State::new());
365            // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
366            let ptr = Arc::into_raw(state) as *mut State;
367            if let Err(actual) = atomic_ptr.compare_exchange(
368                std::ptr::null_mut(),
369                ptr,
370                Ordering::AcqRel,
371                Ordering::Acquire,
372            ) {
373                // SAFETY: This was just created from Arc::into_raw.
374                drop(unsafe { Arc::from_raw(ptr) });
375                actual
376            } else {
377                ptr
378            }
379        }
380
381        let mut ptr = self.state.load(Ordering::Acquire);
382        if ptr.is_null() {
383            ptr = alloc_state(&self.state);
384        }
385        ptr
386    }
387
388    /// Returns a reference to the inner state.
389    #[inline]
390    fn state(&self) -> &State {
391        // SAFETY: So long as an Executor lives, it's state pointer will always be valid
392        // when accessed through state_ptr.
393        unsafe { &*self.state_ptr() }
394    }
395
396    // Clones the inner state Arc
397    #[inline]
398    fn state_as_arc(&self) -> Arc<State> {
399        // SAFETY: So long as an Executor lives, it's state pointer will always be a valid
400        // Arc when accessed through state_ptr.
401        let arc = unsafe { Arc::from_raw(self.state_ptr()) };
402        let clone = arc.clone();
403        std::mem::forget(arc);
404        clone
405    }
406}
407
408impl Drop for Executor<'_> {
409    fn drop(&mut self) {
410        let ptr = *self.state.get_mut();
411        if ptr.is_null() {
412            return;
413        }
414
415        // SAFETY: As ptr is not null, it was allocated via Arc::new and converted
416        // via Arc::into_raw in state_ptr.
417        let state = unsafe { Arc::from_raw(ptr) };
418
419        let mut active = state.active();
420        for w in active.drain() {
421            w.wake();
422        }
423        drop(active);
424
425        while state.queue.pop().is_ok() {}
426    }
427}
428
429impl<'a> Default for Executor<'a> {
430    fn default() -> Executor<'a> {
431        Executor::new()
432    }
433}
434
435/// A thread-local executor.
436///
437/// The executor can only be run on the thread that created it.
438///
439/// # Examples
440///
441/// ```
442/// use async_executor::LocalExecutor;
443/// use futures_lite::future;
444///
445/// let local_ex = LocalExecutor::new();
446///
447/// future::block_on(local_ex.run(async {
448///     println!("Hello world!");
449/// }));
450/// ```
451pub struct LocalExecutor<'a> {
452    /// The inner executor.
453    inner: Executor<'a>,
454
455    /// Makes the type `!Send` and `!Sync`.
456    _marker: PhantomData<Rc<()>>,
457}
458
459impl UnwindSafe for LocalExecutor<'_> {}
460impl RefUnwindSafe for LocalExecutor<'_> {}
461
462impl fmt::Debug for LocalExecutor<'_> {
463    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
464        debug_executor(&self.inner, "LocalExecutor", f)
465    }
466}
467
468impl<'a> LocalExecutor<'a> {
469    /// Creates a single-threaded executor.
470    ///
471    /// # Examples
472    ///
473    /// ```
474    /// use async_executor::LocalExecutor;
475    ///
476    /// let local_ex = LocalExecutor::new();
477    /// ```
478    pub const fn new() -> LocalExecutor<'a> {
479        LocalExecutor {
480            inner: Executor::new(),
481            _marker: PhantomData,
482        }
483    }
484
485    /// Returns `true` if there are no unfinished tasks.
486    ///
487    /// # Examples
488    ///
489    /// ```
490    /// use async_executor::LocalExecutor;
491    ///
492    /// let local_ex = LocalExecutor::new();
493    /// assert!(local_ex.is_empty());
494    ///
495    /// let task = local_ex.spawn(async {
496    ///     println!("Hello world");
497    /// });
498    /// assert!(!local_ex.is_empty());
499    ///
500    /// assert!(local_ex.try_tick());
501    /// assert!(local_ex.is_empty());
502    /// ```
503    pub fn is_empty(&self) -> bool {
504        self.inner().is_empty()
505    }
506
507    /// Spawns a task onto the executor.
508    ///
509    /// # Examples
510    ///
511    /// ```
512    /// use async_executor::LocalExecutor;
513    ///
514    /// let local_ex = LocalExecutor::new();
515    ///
516    /// let task = local_ex.spawn(async {
517    ///     println!("Hello world");
518    /// });
519    /// ```
520    pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
521        let mut active = self.inner().state().active();
522
523        // SAFETY: This executor is not thread safe, so the future and its result
524        //         cannot be sent to another thread.
525        unsafe { self.inner().spawn_inner(future, &mut active) }
526    }
527
528    /// Spawns many tasks onto the executor.
529    ///
530    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
531    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
532    /// contention.
533    ///
534    /// It is assumed that the iterator provided does not block; blocking iterators can lock up
535    /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
536    /// mutex is not released, as there are no other threads that can poll this executor.
537    ///
538    /// ## Example
539    ///
540    /// ```
541    /// use async_executor::LocalExecutor;
542    /// use futures_lite::{stream, prelude::*};
543    /// use std::future::ready;
544    ///
545    /// # futures_lite::future::block_on(async {
546    /// let mut ex = LocalExecutor::new();
547    ///
548    /// let futures = [
549    ///     ready(1),
550    ///     ready(2),
551    ///     ready(3)
552    /// ];
553    ///
554    /// // Spawn all of the futures onto the executor at once.
555    /// let mut tasks = vec![];
556    /// ex.spawn_many(futures, &mut tasks);
557    ///
558    /// // Await all of them.
559    /// let results = ex.run(async move {
560    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
561    /// }).await;
562    /// assert_eq!(results, [1, 2, 3]);
563    /// # });
564    /// ```
565    ///
566    /// [`spawn`]: LocalExecutor::spawn
567    /// [`Executor::spawn_many`]: Executor::spawn_many
568    pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
569        &self,
570        futures: impl IntoIterator<Item = F>,
571        handles: &mut impl Extend<Task<F::Output>>,
572    ) {
573        let mut active = self.inner().state().active();
574
575        // Convert all of the futures to tasks.
576        let tasks = futures.into_iter().map(|future| {
577            // SAFETY: This executor is not thread safe, so the future and its result
578            //         cannot be sent to another thread.
579            unsafe { self.inner().spawn_inner(future, &mut active) }
580
581            // As only one thread can spawn or poll tasks at a time, there is no need
582            // to release lock contention here.
583        });
584
585        // Push them to the user's collection.
586        handles.extend(tasks);
587    }
588
589    /// Attempts to run a task if at least one is scheduled.
590    ///
591    /// Running a scheduled task means simply polling its future once.
592    ///
593    /// # Examples
594    ///
595    /// ```
596    /// use async_executor::LocalExecutor;
597    ///
598    /// let ex = LocalExecutor::new();
599    /// assert!(!ex.try_tick()); // no tasks to run
600    ///
601    /// let task = ex.spawn(async {
602    ///     println!("Hello world");
603    /// });
604    /// assert!(ex.try_tick()); // a task was found
605    /// ```
606    pub fn try_tick(&self) -> bool {
607        self.inner().try_tick()
608    }
609
610    /// Runs a single task.
611    ///
612    /// Running a task means simply polling its future once.
613    ///
614    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
615    ///
616    /// # Examples
617    ///
618    /// ```
619    /// use async_executor::LocalExecutor;
620    /// use futures_lite::future;
621    ///
622    /// let ex = LocalExecutor::new();
623    ///
624    /// let task = ex.spawn(async {
625    ///     println!("Hello world");
626    /// });
627    /// future::block_on(ex.tick()); // runs the task
628    /// ```
629    pub async fn tick(&self) {
630        self.inner().tick().await
631    }
632
633    /// Runs the executor until the given future completes.
634    ///
635    /// # Examples
636    ///
637    /// ```
638    /// use async_executor::LocalExecutor;
639    /// use futures_lite::future;
640    ///
641    /// let local_ex = LocalExecutor::new();
642    ///
643    /// let task = local_ex.spawn(async { 1 + 2 });
644    /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
645    ///
646    /// assert_eq!(res, 6);
647    /// ```
648    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
649        self.inner().run(future).await
650    }
651
652    /// Returns a reference to the inner executor.
653    fn inner(&self) -> &Executor<'a> {
654        &self.inner
655    }
656}
657
658impl<'a> Default for LocalExecutor<'a> {
659    fn default() -> LocalExecutor<'a> {
660        LocalExecutor::new()
661    }
662}
663
664/// The state of a executor.
665struct State {
666    /// The global queue.
667    queue: ConcurrentQueue<Runnable>,
668
669    /// Local queues created by runners.
670    local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
671
672    /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
673    notified: AtomicBool,
674
675    /// A list of sleeping tickers.
676    sleepers: Mutex<Sleepers>,
677
678    /// Currently active tasks.
679    active: Mutex<Slab<Waker>>,
680}
681
682impl State {
683    /// Creates state for a new executor.
684    const fn new() -> State {
685        State {
686            queue: ConcurrentQueue::unbounded(),
687            local_queues: RwLock::new(Vec::new()),
688            notified: AtomicBool::new(true),
689            sleepers: Mutex::new(Sleepers {
690                count: 0,
691                wakers: Vec::new(),
692                free_ids: Vec::new(),
693            }),
694            active: Mutex::new(Slab::new()),
695        }
696    }
697
698    /// Returns a reference to currently active tasks.
699    fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
700        self.active.lock().unwrap_or_else(PoisonError::into_inner)
701    }
702
703    /// Notifies a sleeping ticker.
704    #[inline]
705    fn notify(&self) {
706        if self
707            .notified
708            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
709            .is_ok()
710        {
711            let waker = self
712                .sleepers
713                .lock()
714                .unwrap_or_else(PoisonError::into_inner)
715                .notify();
716            if let Some(w) = waker {
717                w.wake();
718            }
719        }
720    }
721
722    pub(crate) fn try_tick(&self) -> bool {
723        match self.queue.pop() {
724            Err(_) => false,
725            Ok(runnable) => {
726                // Notify another ticker now to pick up where this ticker left off, just in case
727                // running the task takes a long time.
728                self.notify();
729
730                // Run the task.
731                runnable.run();
732                true
733            }
734        }
735    }
736
737    pub(crate) async fn tick(&self) {
738        let runnable = Ticker::new(self).runnable().await;
739        runnable.run();
740    }
741
742    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
743        let mut runner = Runner::new(self);
744        let mut rng = fastrand::Rng::new();
745
746        // A future that runs tasks forever.
747        let run_forever = async {
748            loop {
749                for _ in 0..200 {
750                    let runnable = runner.runnable(&mut rng).await;
751                    runnable.run();
752                }
753                future::yield_now().await;
754            }
755        };
756
757        // Run `future` and `run_forever` concurrently until `future` completes.
758        future.or(run_forever).await
759    }
760}
761
762/// A list of sleeping tickers.
763struct Sleepers {
764    /// Number of sleeping tickers (both notified and unnotified).
765    count: usize,
766
767    /// IDs and wakers of sleeping unnotified tickers.
768    ///
769    /// A sleeping ticker is notified when its waker is missing from this list.
770    wakers: Vec<(usize, Waker)>,
771
772    /// Reclaimed IDs.
773    free_ids: Vec<usize>,
774}
775
776impl Sleepers {
777    /// Inserts a new sleeping ticker.
778    fn insert(&mut self, waker: &Waker) -> usize {
779        let id = match self.free_ids.pop() {
780            Some(id) => id,
781            None => self.count + 1,
782        };
783        self.count += 1;
784        self.wakers.push((id, waker.clone()));
785        id
786    }
787
788    /// Re-inserts a sleeping ticker's waker if it was notified.
789    ///
790    /// Returns `true` if the ticker was notified.
791    fn update(&mut self, id: usize, waker: &Waker) -> bool {
792        for item in &mut self.wakers {
793            if item.0 == id {
794                item.1.clone_from(waker);
795                return false;
796            }
797        }
798
799        self.wakers.push((id, waker.clone()));
800        true
801    }
802
803    /// Removes a previously inserted sleeping ticker.
804    ///
805    /// Returns `true` if the ticker was notified.
806    fn remove(&mut self, id: usize) -> bool {
807        self.count -= 1;
808        self.free_ids.push(id);
809
810        for i in (0..self.wakers.len()).rev() {
811            if self.wakers[i].0 == id {
812                self.wakers.remove(i);
813                return false;
814            }
815        }
816        true
817    }
818
819    /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
820    fn is_notified(&self) -> bool {
821        self.count == 0 || self.count > self.wakers.len()
822    }
823
824    /// Returns notification waker for a sleeping ticker.
825    ///
826    /// If a ticker was notified already or there are no tickers, `None` will be returned.
827    fn notify(&mut self) -> Option<Waker> {
828        if self.wakers.len() == self.count {
829            self.wakers.pop().map(|item| item.1)
830        } else {
831            None
832        }
833    }
834}
835
836/// Runs task one by one.
837struct Ticker<'a> {
838    /// The executor state.
839    state: &'a State,
840
841    /// Set to a non-zero sleeper ID when in sleeping state.
842    ///
843    /// States a ticker can be in:
844    /// 1) Woken.
845    ///    2a) Sleeping and unnotified.
846    ///    2b) Sleeping and notified.
847    sleeping: usize,
848}
849
850impl Ticker<'_> {
851    /// Creates a ticker.
852    fn new(state: &State) -> Ticker<'_> {
853        Ticker { state, sleeping: 0 }
854    }
855
856    /// Moves the ticker into sleeping and unnotified state.
857    ///
858    /// Returns `false` if the ticker was already sleeping and unnotified.
859    fn sleep(&mut self, waker: &Waker) -> bool {
860        let mut sleepers = self
861            .state
862            .sleepers
863            .lock()
864            .unwrap_or_else(PoisonError::into_inner);
865
866        match self.sleeping {
867            // Move to sleeping state.
868            0 => {
869                self.sleeping = sleepers.insert(waker);
870            }
871
872            // Already sleeping, check if notified.
873            id => {
874                if !sleepers.update(id, waker) {
875                    return false;
876                }
877            }
878        }
879
880        self.state
881            .notified
882            .store(sleepers.is_notified(), Ordering::Release);
883
884        true
885    }
886
887    /// Moves the ticker into woken state.
888    fn wake(&mut self) {
889        if self.sleeping != 0 {
890            let mut sleepers = self
891                .state
892                .sleepers
893                .lock()
894                .unwrap_or_else(PoisonError::into_inner);
895            sleepers.remove(self.sleeping);
896
897            self.state
898                .notified
899                .store(sleepers.is_notified(), Ordering::Release);
900        }
901        self.sleeping = 0;
902    }
903
904    /// Waits for the next runnable task to run.
905    async fn runnable(&mut self) -> Runnable {
906        self.runnable_with(|| self.state.queue.pop().ok()).await
907    }
908
909    /// Waits for the next runnable task to run, given a function that searches for a task.
910    async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
911        future::poll_fn(|cx| {
912            loop {
913                match search() {
914                    None => {
915                        // Move to sleeping and unnotified state.
916                        if !self.sleep(cx.waker()) {
917                            // If already sleeping and unnotified, return.
918                            return Poll::Pending;
919                        }
920                    }
921                    Some(r) => {
922                        // Wake up.
923                        self.wake();
924
925                        // Notify another ticker now to pick up where this ticker left off, just in
926                        // case running the task takes a long time.
927                        self.state.notify();
928
929                        return Poll::Ready(r);
930                    }
931                }
932            }
933        })
934        .await
935    }
936}
937
938impl Drop for Ticker<'_> {
939    fn drop(&mut self) {
940        // If this ticker is in sleeping state, it must be removed from the sleepers list.
941        if self.sleeping != 0 {
942            let mut sleepers = self
943                .state
944                .sleepers
945                .lock()
946                .unwrap_or_else(PoisonError::into_inner);
947            let notified = sleepers.remove(self.sleeping);
948
949            self.state
950                .notified
951                .store(sleepers.is_notified(), Ordering::Release);
952
953            // If this ticker was notified, then notify another ticker.
954            if notified {
955                drop(sleepers);
956                self.state.notify();
957            }
958        }
959    }
960}
961
962/// A worker in a work-stealing executor.
963///
964/// This is just a ticker that also has an associated local queue for improved cache locality.
965struct Runner<'a> {
966    /// The executor state.
967    state: &'a State,
968
969    /// Inner ticker.
970    ticker: Ticker<'a>,
971
972    /// The local queue.
973    local: Arc<ConcurrentQueue<Runnable>>,
974
975    /// Bumped every time a runnable task is found.
976    ticks: usize,
977}
978
979impl Runner<'_> {
980    /// Creates a runner and registers it in the executor state.
981    fn new(state: &State) -> Runner<'_> {
982        let runner = Runner {
983            state,
984            ticker: Ticker::new(state),
985            local: Arc::new(ConcurrentQueue::bounded(512)),
986            ticks: 0,
987        };
988        state
989            .local_queues
990            .write()
991            .unwrap_or_else(PoisonError::into_inner)
992            .push(runner.local.clone());
993        runner
994    }
995
996    /// Waits for the next runnable task to run.
997    async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
998        let runnable = self
999            .ticker
1000            .runnable_with(|| {
1001                // Try the local queue.
1002                if let Ok(r) = self.local.pop() {
1003                    return Some(r);
1004                }
1005
1006                // Try stealing from the global queue.
1007                if let Ok(r) = self.state.queue.pop() {
1008                    steal(&self.state.queue, &self.local);
1009                    return Some(r);
1010                }
1011
1012                // Try stealing from other runners.
1013                if let Ok(local_queues) = self.state.local_queues.try_read() {
1014                    // Pick a random starting point in the iterator list and rotate the list.
1015                    let n = local_queues.len();
1016                    let start = rng.usize(..n);
1017                    let iter = local_queues
1018                        .iter()
1019                        .chain(local_queues.iter())
1020                        .skip(start)
1021                        .take(n);
1022
1023                    // Remove this runner's local queue.
1024                    let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1025
1026                    // Try stealing from each local queue in the list.
1027                    for local in iter {
1028                        steal(local, &self.local);
1029                        if let Ok(r) = self.local.pop() {
1030                            return Some(r);
1031                        }
1032                    }
1033                }
1034
1035                None
1036            })
1037            .await;
1038
1039        // Bump the tick counter.
1040        self.ticks = self.ticks.wrapping_add(1);
1041
1042        if self.ticks % 64 == 0 {
1043            // Steal tasks from the global queue to ensure fair task scheduling.
1044            steal(&self.state.queue, &self.local);
1045        }
1046
1047        runnable
1048    }
1049}
1050
1051impl Drop for Runner<'_> {
1052    fn drop(&mut self) {
1053        // Remove the local queue.
1054        self.state
1055            .local_queues
1056            .write()
1057            .unwrap_or_else(PoisonError::into_inner)
1058            .retain(|local| !Arc::ptr_eq(local, &self.local));
1059
1060        // Re-schedule remaining tasks in the local queue.
1061        while let Ok(r) = self.local.pop() {
1062            r.schedule();
1063        }
1064    }
1065}
1066
1067/// Steals some items from one queue into another.
1068fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
1069    // Half of `src`'s length rounded up.
1070    let mut count = (src.len() + 1) / 2;
1071
1072    if count > 0 {
1073        // Don't steal more than fits into the queue.
1074        if let Some(cap) = dest.capacity() {
1075            count = count.min(cap - dest.len());
1076        }
1077
1078        // Steal tasks.
1079        for _ in 0..count {
1080            if let Ok(t) = src.pop() {
1081                assert!(dest.push(t).is_ok());
1082            } else {
1083                break;
1084            }
1085        }
1086    }
1087}
1088
1089/// Debug implementation for `Executor` and `LocalExecutor`.
1090fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1091    // Get a reference to the state.
1092    let ptr = executor.state.load(Ordering::Acquire);
1093    if ptr.is_null() {
1094        // The executor has not been initialized.
1095        struct Uninitialized;
1096
1097        impl fmt::Debug for Uninitialized {
1098            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1099                f.write_str("<uninitialized>")
1100            }
1101        }
1102
1103        return f.debug_tuple(name).field(&Uninitialized).finish();
1104    }
1105
1106    // SAFETY: If the state pointer is not null, it must have been
1107    // allocated properly by Arc::new and converted via Arc::into_raw
1108    // in state_ptr.
1109    let state = unsafe { &*ptr };
1110
1111    debug_state(state, name, f)
1112}
1113
1114/// Debug implementation for `Executor` and `LocalExecutor`.
1115fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1116    /// Debug wrapper for the number of active tasks.
1117    struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
1118
1119    impl fmt::Debug for ActiveTasks<'_> {
1120        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1121            match self.0.try_lock() {
1122                Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
1123                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1124                Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f),
1125            }
1126        }
1127    }
1128
1129    /// Debug wrapper for the local runners.
1130    struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
1131
1132    impl fmt::Debug for LocalRunners<'_> {
1133        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1134            match self.0.try_read() {
1135                Ok(lock) => f
1136                    .debug_list()
1137                    .entries(lock.iter().map(|queue| queue.len()))
1138                    .finish(),
1139                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1140                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1141            }
1142        }
1143    }
1144
1145    /// Debug wrapper for the sleepers.
1146    struct SleepCount<'a>(&'a Mutex<Sleepers>);
1147
1148    impl fmt::Debug for SleepCount<'_> {
1149        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1150            match self.0.try_lock() {
1151                Ok(lock) => fmt::Debug::fmt(&lock.count, f),
1152                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1153                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1154            }
1155        }
1156    }
1157
1158    f.debug_struct(name)
1159        .field("active", &ActiveTasks(&state.active))
1160        .field("global_tasks", &state.queue.len())
1161        .field("local_runners", &LocalRunners(&state.local_queues))
1162        .field("sleepers", &SleepCount(&state.sleepers))
1163        .finish()
1164}
1165
1166/// Runs a closure when dropped.
1167struct CallOnDrop<F: FnMut()>(F);
1168
1169impl<F: FnMut()> Drop for CallOnDrop<F> {
1170    fn drop(&mut self) {
1171        (self.0)();
1172    }
1173}
1174
1175pin_project! {
1176    /// A wrapper around a future, running a closure when dropped.
1177    struct AsyncCallOnDrop<Fut, Cleanup: FnMut()> {
1178        #[pin]
1179        future: Fut,
1180        cleanup: CallOnDrop<Cleanup>,
1181    }
1182}
1183
1184impl<Fut, Cleanup: FnMut()> AsyncCallOnDrop<Fut, Cleanup> {
1185    fn new(future: Fut, cleanup: Cleanup) -> Self {
1186        Self {
1187            future,
1188            cleanup: CallOnDrop(cleanup),
1189        }
1190    }
1191}
1192
1193impl<Fut: Future, Cleanup: FnMut()> Future for AsyncCallOnDrop<Fut, Cleanup> {
1194    type Output = Fut::Output;
1195
1196    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1197        self.project().future.poll(cx)
1198    }
1199}
1200
1201fn _ensure_send_and_sync() {
1202    use futures_lite::future::pending;
1203
1204    fn is_send<T: Send>(_: T) {}
1205    fn is_sync<T: Sync>(_: T) {}
1206    fn is_static<T: 'static>(_: T) {}
1207
1208    is_send::<Executor<'_>>(Executor::new());
1209    is_sync::<Executor<'_>>(Executor::new());
1210
1211    let ex = Executor::new();
1212    is_send(ex.run(pending::<()>()));
1213    is_sync(ex.run(pending::<()>()));
1214    is_send(ex.tick());
1215    is_sync(ex.tick());
1216    is_send(ex.schedule());
1217    is_sync(ex.schedule());
1218    is_static(ex.schedule());
1219
1220    /// ```compile_fail
1221    /// use async_executor::LocalExecutor;
1222    /// use futures_lite::future::pending;
1223    ///
1224    /// fn is_send<T: Send>(_: T) {}
1225    /// fn is_sync<T: Sync>(_: T) {}
1226    ///
1227    /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
1228    /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
1229    ///
1230    /// let ex = LocalExecutor::new();
1231    /// is_send(ex.run(pending::<()>()));
1232    /// is_sync(ex.run(pending::<()>()));
1233    /// is_send(ex.tick());
1234    /// is_sync(ex.tick());
1235    /// ```
1236    fn _negative_test() {}
1237}