async_executor/lib.rs
1//! Async executors.
2//!
3//! This crate provides two reference executors that trade performance for
4//! functionality. They should be considered reference executors that are "good
5//! enough" for most use cases. For more specialized use cases, consider writing
6//! your own executor on top of [`async-task`].
7//!
8//! [`async-task`]: https://crates.io/crates/async-task
9//!
10//! # Examples
11//!
12//! ```
13//! use async_executor::Executor;
14//! use futures_lite::future;
15//!
16//! // Create a new executor.
17//! let ex = Executor::new();
18//!
19//! // Spawn a task.
20//! let task = ex.spawn(async {
21//! println!("Hello world");
22//! });
23//!
24//! // Run the executor until the task completes.
25//! future::block_on(ex.run(task));
26//! ```
27
28#![warn(
29 missing_docs,
30 missing_debug_implementations,
31 rust_2018_idioms,
32 clippy::undocumented_unsafe_blocks
33)]
34#![doc(
35 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
41#![allow(clippy::unused_unit)] // false positive fixed in Rust 1.89
42
43use std::fmt;
44use std::marker::PhantomData;
45use std::panic::{RefUnwindSafe, UnwindSafe};
46use std::pin::Pin;
47use std::rc::Rc;
48use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
49use std::sync::{Arc, Mutex, MutexGuard, PoisonError, RwLock, TryLockError};
50use std::task::{Context, Poll, Waker};
51
52use async_task::{Builder, Runnable};
53use concurrent_queue::ConcurrentQueue;
54use futures_lite::{future, prelude::*};
55use pin_project_lite::pin_project;
56use slab::Slab;
57
58#[cfg(feature = "static")]
59mod static_executors;
60
61#[doc(no_inline)]
62pub use async_task::{FallibleTask, Task};
63#[cfg(feature = "static")]
64#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
65pub use static_executors::*;
66
67/// An async executor.
68///
69/// # Examples
70///
71/// A multi-threaded executor:
72///
73/// ```
74/// use async_channel::unbounded;
75/// use async_executor::Executor;
76/// use easy_parallel::Parallel;
77/// use futures_lite::future;
78///
79/// let ex = Executor::new();
80/// let (signal, shutdown) = unbounded::<()>();
81///
82/// Parallel::new()
83/// // Run four executor threads.
84/// .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
85/// // Run the main future on the current thread.
86/// .finish(|| future::block_on(async {
87/// println!("Hello world!");
88/// drop(signal);
89/// }));
90/// ```
91pub struct Executor<'a> {
92 /// The executor state.
93 state: AtomicPtr<State>,
94
95 /// Makes the `'a` lifetime invariant.
96 _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
97}
98
99// SAFETY: Executor stores no thread local state that can be accessed via other thread.
100unsafe impl Send for Executor<'_> {}
101// SAFETY: Executor internally synchronizes all of it's operations internally.
102unsafe impl Sync for Executor<'_> {}
103
104impl UnwindSafe for Executor<'_> {}
105impl RefUnwindSafe for Executor<'_> {}
106
107impl fmt::Debug for Executor<'_> {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 debug_executor(self, "Executor", f)
110 }
111}
112
113impl<'a> Executor<'a> {
114 /// Creates a new executor.
115 ///
116 /// # Examples
117 ///
118 /// ```
119 /// use async_executor::Executor;
120 ///
121 /// let ex = Executor::new();
122 /// ```
123 pub const fn new() -> Executor<'a> {
124 Executor {
125 state: AtomicPtr::new(std::ptr::null_mut()),
126 _marker: PhantomData,
127 }
128 }
129
130 /// Returns `true` if there are no unfinished tasks.
131 ///
132 /// # Examples
133 ///
134 /// ```
135 /// use async_executor::Executor;
136 ///
137 /// let ex = Executor::new();
138 /// assert!(ex.is_empty());
139 ///
140 /// let task = ex.spawn(async {
141 /// println!("Hello world");
142 /// });
143 /// assert!(!ex.is_empty());
144 ///
145 /// assert!(ex.try_tick());
146 /// assert!(ex.is_empty());
147 /// ```
148 pub fn is_empty(&self) -> bool {
149 self.state().active().is_empty()
150 }
151
152 /// Spawns a task onto the executor.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use async_executor::Executor;
158 ///
159 /// let ex = Executor::new();
160 ///
161 /// let task = ex.spawn(async {
162 /// println!("Hello world");
163 /// });
164 /// ```
165 pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
166 let mut active = self.state().active();
167
168 // SAFETY: `T` and the future are `Send`.
169 unsafe { self.spawn_inner(future, &mut active) }
170 }
171
172 /// Spawns many tasks onto the executor.
173 ///
174 /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
175 /// spawns all of the tasks in one go. With large amounts of tasks this can improve
176 /// contention.
177 ///
178 /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
179 /// prevent runner thread starvation. It is assumed that the iterator provided does not
180 /// block; blocking iterators can lock up the internal mutex and therefore the entire
181 /// executor.
182 ///
183 /// ## Example
184 ///
185 /// ```
186 /// use async_executor::Executor;
187 /// use futures_lite::{stream, prelude::*};
188 /// use std::future::ready;
189 ///
190 /// # futures_lite::future::block_on(async {
191 /// let mut ex = Executor::new();
192 ///
193 /// let futures = [
194 /// ready(1),
195 /// ready(2),
196 /// ready(3)
197 /// ];
198 ///
199 /// // Spawn all of the futures onto the executor at once.
200 /// let mut tasks = vec![];
201 /// ex.spawn_many(futures, &mut tasks);
202 ///
203 /// // Await all of them.
204 /// let results = ex.run(async move {
205 /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
206 /// }).await;
207 /// assert_eq!(results, [1, 2, 3]);
208 /// # });
209 /// ```
210 ///
211 /// [`spawn`]: Executor::spawn
212 pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
213 &self,
214 futures: impl IntoIterator<Item = F>,
215 handles: &mut impl Extend<Task<F::Output>>,
216 ) {
217 let mut active = Some(self.state().active());
218
219 // Convert the futures into tasks.
220 let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
221 // SAFETY: `T` and the future are `Send`.
222 let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
223
224 // Yield the lock every once in a while to ease contention.
225 if i.wrapping_sub(1) % 500 == 0 {
226 drop(active.take());
227 active = Some(self.state().active());
228 }
229
230 task
231 });
232
233 // Push the tasks to the user's collection.
234 handles.extend(tasks);
235 }
236
237 /// Spawn a future while holding the inner lock.
238 ///
239 /// # Safety
240 ///
241 /// If this is an `Executor`, `F` and `T` must be `Send`.
242 unsafe fn spawn_inner<T: 'a>(
243 &self,
244 future: impl Future<Output = T> + 'a,
245 active: &mut Slab<Waker>,
246 ) -> Task<T> {
247 // Remove the task from the set of active tasks when the future finishes.
248 let entry = active.vacant_entry();
249 let index = entry.key();
250 let state = self.state_as_arc();
251 let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index)));
252
253 // Create the task and register it in the set of active tasks.
254 //
255 // SAFETY:
256 //
257 // If `future` is not `Send`, this must be a `LocalExecutor` as per this
258 // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
259 // `try_tick`, `tick` and `run` can only be called from the origin
260 // thread of the `LocalExecutor`. Similarly, `spawn` can only be called
261 // from the origin thread, ensuring that `future` and the executor share
262 // the same origin thread. The `Runnable` can be scheduled from other
263 // threads, but because of the above `Runnable` can only be called or
264 // dropped on the origin thread.
265 //
266 // `future` is not `'static`, but we make sure that the `Runnable` does
267 // not outlive `'a`. When the executor is dropped, the `active` field is
268 // drained and all of the `Waker`s are woken. Then, the queue inside of
269 // the `Executor` is drained of all of its runnables. This ensures that
270 // runnables are dropped and this precondition is satisfied.
271 //
272 // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
273 // Therefore we do not need to worry about what is done with the
274 // `Waker`.
275 let (runnable, task) = Builder::new()
276 .propagate_panic(true)
277 .spawn_unchecked(|()| future, self.schedule());
278 entry.insert(runnable.waker());
279
280 runnable.schedule();
281 task
282 }
283
284 /// Attempts to run a task if at least one is scheduled.
285 ///
286 /// Running a scheduled task means simply polling its future once.
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// use async_executor::Executor;
292 ///
293 /// let ex = Executor::new();
294 /// assert!(!ex.try_tick()); // no tasks to run
295 ///
296 /// let task = ex.spawn(async {
297 /// println!("Hello world");
298 /// });
299 /// assert!(ex.try_tick()); // a task was found
300 /// ```
301 pub fn try_tick(&self) -> bool {
302 self.state().try_tick()
303 }
304
305 /// Runs a single task.
306 ///
307 /// Running a task means simply polling its future once.
308 ///
309 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
310 ///
311 /// # Examples
312 ///
313 /// ```
314 /// use async_executor::Executor;
315 /// use futures_lite::future;
316 ///
317 /// let ex = Executor::new();
318 ///
319 /// let task = ex.spawn(async {
320 /// println!("Hello world");
321 /// });
322 /// future::block_on(ex.tick()); // runs the task
323 /// ```
324 pub async fn tick(&self) {
325 self.state().tick().await;
326 }
327
328 /// Runs the executor until the given future completes.
329 ///
330 /// # Examples
331 ///
332 /// ```
333 /// use async_executor::Executor;
334 /// use futures_lite::future;
335 ///
336 /// let ex = Executor::new();
337 ///
338 /// let task = ex.spawn(async { 1 + 2 });
339 /// let res = future::block_on(ex.run(async { task.await * 2 }));
340 ///
341 /// assert_eq!(res, 6);
342 /// ```
343 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
344 self.state().run(future).await
345 }
346
347 /// Returns a function that schedules a runnable task when it gets woken up.
348 fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
349 let state = self.state_as_arc();
350
351 // TODO: If possible, push into the current local queue and notify the ticker.
352 move |runnable| {
353 let result = state.queue.push(runnable);
354 debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
355 state.notify();
356 }
357 }
358
359 /// Returns a pointer to the inner state.
360 #[inline]
361 fn state_ptr(&self) -> *const State {
362 #[cold]
363 fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
364 let state = Arc::new(State::new());
365 // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
366 let ptr = Arc::into_raw(state) as *mut State;
367 if let Err(actual) = atomic_ptr.compare_exchange(
368 std::ptr::null_mut(),
369 ptr,
370 Ordering::AcqRel,
371 Ordering::Acquire,
372 ) {
373 // SAFETY: This was just created from Arc::into_raw.
374 drop(unsafe { Arc::from_raw(ptr) });
375 actual
376 } else {
377 ptr
378 }
379 }
380
381 let mut ptr = self.state.load(Ordering::Acquire);
382 if ptr.is_null() {
383 ptr = alloc_state(&self.state);
384 }
385 ptr
386 }
387
388 /// Returns a reference to the inner state.
389 #[inline]
390 fn state(&self) -> &State {
391 // SAFETY: So long as an Executor lives, it's state pointer will always be valid
392 // when accessed through state_ptr.
393 unsafe { &*self.state_ptr() }
394 }
395
396 // Clones the inner state Arc
397 #[inline]
398 fn state_as_arc(&self) -> Arc<State> {
399 // SAFETY: So long as an Executor lives, it's state pointer will always be a valid
400 // Arc when accessed through state_ptr.
401 let arc = unsafe { Arc::from_raw(self.state_ptr()) };
402 let clone = arc.clone();
403 std::mem::forget(arc);
404 clone
405 }
406}
407
408impl Drop for Executor<'_> {
409 fn drop(&mut self) {
410 let ptr = *self.state.get_mut();
411 if ptr.is_null() {
412 return;
413 }
414
415 // SAFETY: As ptr is not null, it was allocated via Arc::new and converted
416 // via Arc::into_raw in state_ptr.
417 let state = unsafe { Arc::from_raw(ptr) };
418
419 let mut active = state.active();
420 for w in active.drain() {
421 w.wake();
422 }
423 drop(active);
424
425 while state.queue.pop().is_ok() {}
426 }
427}
428
429impl<'a> Default for Executor<'a> {
430 fn default() -> Executor<'a> {
431 Executor::new()
432 }
433}
434
435/// A thread-local executor.
436///
437/// The executor can only be run on the thread that created it.
438///
439/// # Examples
440///
441/// ```
442/// use async_executor::LocalExecutor;
443/// use futures_lite::future;
444///
445/// let local_ex = LocalExecutor::new();
446///
447/// future::block_on(local_ex.run(async {
448/// println!("Hello world!");
449/// }));
450/// ```
451pub struct LocalExecutor<'a> {
452 /// The inner executor.
453 inner: Executor<'a>,
454
455 /// Makes the type `!Send` and `!Sync`.
456 _marker: PhantomData<Rc<()>>,
457}
458
459impl UnwindSafe for LocalExecutor<'_> {}
460impl RefUnwindSafe for LocalExecutor<'_> {}
461
462impl fmt::Debug for LocalExecutor<'_> {
463 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
464 debug_executor(&self.inner, "LocalExecutor", f)
465 }
466}
467
468impl<'a> LocalExecutor<'a> {
469 /// Creates a single-threaded executor.
470 ///
471 /// # Examples
472 ///
473 /// ```
474 /// use async_executor::LocalExecutor;
475 ///
476 /// let local_ex = LocalExecutor::new();
477 /// ```
478 pub const fn new() -> LocalExecutor<'a> {
479 LocalExecutor {
480 inner: Executor::new(),
481 _marker: PhantomData,
482 }
483 }
484
485 /// Returns `true` if there are no unfinished tasks.
486 ///
487 /// # Examples
488 ///
489 /// ```
490 /// use async_executor::LocalExecutor;
491 ///
492 /// let local_ex = LocalExecutor::new();
493 /// assert!(local_ex.is_empty());
494 ///
495 /// let task = local_ex.spawn(async {
496 /// println!("Hello world");
497 /// });
498 /// assert!(!local_ex.is_empty());
499 ///
500 /// assert!(local_ex.try_tick());
501 /// assert!(local_ex.is_empty());
502 /// ```
503 pub fn is_empty(&self) -> bool {
504 self.inner().is_empty()
505 }
506
507 /// Spawns a task onto the executor.
508 ///
509 /// # Examples
510 ///
511 /// ```
512 /// use async_executor::LocalExecutor;
513 ///
514 /// let local_ex = LocalExecutor::new();
515 ///
516 /// let task = local_ex.spawn(async {
517 /// println!("Hello world");
518 /// });
519 /// ```
520 pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
521 let mut active = self.inner().state().active();
522
523 // SAFETY: This executor is not thread safe, so the future and its result
524 // cannot be sent to another thread.
525 unsafe { self.inner().spawn_inner(future, &mut active) }
526 }
527
528 /// Spawns many tasks onto the executor.
529 ///
530 /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
531 /// spawns all of the tasks in one go. With large amounts of tasks this can improve
532 /// contention.
533 ///
534 /// It is assumed that the iterator provided does not block; blocking iterators can lock up
535 /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
536 /// mutex is not released, as there are no other threads that can poll this executor.
537 ///
538 /// ## Example
539 ///
540 /// ```
541 /// use async_executor::LocalExecutor;
542 /// use futures_lite::{stream, prelude::*};
543 /// use std::future::ready;
544 ///
545 /// # futures_lite::future::block_on(async {
546 /// let mut ex = LocalExecutor::new();
547 ///
548 /// let futures = [
549 /// ready(1),
550 /// ready(2),
551 /// ready(3)
552 /// ];
553 ///
554 /// // Spawn all of the futures onto the executor at once.
555 /// let mut tasks = vec![];
556 /// ex.spawn_many(futures, &mut tasks);
557 ///
558 /// // Await all of them.
559 /// let results = ex.run(async move {
560 /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
561 /// }).await;
562 /// assert_eq!(results, [1, 2, 3]);
563 /// # });
564 /// ```
565 ///
566 /// [`spawn`]: LocalExecutor::spawn
567 /// [`Executor::spawn_many`]: Executor::spawn_many
568 pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
569 &self,
570 futures: impl IntoIterator<Item = F>,
571 handles: &mut impl Extend<Task<F::Output>>,
572 ) {
573 let mut active = self.inner().state().active();
574
575 // Convert all of the futures to tasks.
576 let tasks = futures.into_iter().map(|future| {
577 // SAFETY: This executor is not thread safe, so the future and its result
578 // cannot be sent to another thread.
579 unsafe { self.inner().spawn_inner(future, &mut active) }
580
581 // As only one thread can spawn or poll tasks at a time, there is no need
582 // to release lock contention here.
583 });
584
585 // Push them to the user's collection.
586 handles.extend(tasks);
587 }
588
589 /// Attempts to run a task if at least one is scheduled.
590 ///
591 /// Running a scheduled task means simply polling its future once.
592 ///
593 /// # Examples
594 ///
595 /// ```
596 /// use async_executor::LocalExecutor;
597 ///
598 /// let ex = LocalExecutor::new();
599 /// assert!(!ex.try_tick()); // no tasks to run
600 ///
601 /// let task = ex.spawn(async {
602 /// println!("Hello world");
603 /// });
604 /// assert!(ex.try_tick()); // a task was found
605 /// ```
606 pub fn try_tick(&self) -> bool {
607 self.inner().try_tick()
608 }
609
610 /// Runs a single task.
611 ///
612 /// Running a task means simply polling its future once.
613 ///
614 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
615 ///
616 /// # Examples
617 ///
618 /// ```
619 /// use async_executor::LocalExecutor;
620 /// use futures_lite::future;
621 ///
622 /// let ex = LocalExecutor::new();
623 ///
624 /// let task = ex.spawn(async {
625 /// println!("Hello world");
626 /// });
627 /// future::block_on(ex.tick()); // runs the task
628 /// ```
629 pub async fn tick(&self) {
630 self.inner().tick().await
631 }
632
633 /// Runs the executor until the given future completes.
634 ///
635 /// # Examples
636 ///
637 /// ```
638 /// use async_executor::LocalExecutor;
639 /// use futures_lite::future;
640 ///
641 /// let local_ex = LocalExecutor::new();
642 ///
643 /// let task = local_ex.spawn(async { 1 + 2 });
644 /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
645 ///
646 /// assert_eq!(res, 6);
647 /// ```
648 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
649 self.inner().run(future).await
650 }
651
652 /// Returns a reference to the inner executor.
653 fn inner(&self) -> &Executor<'a> {
654 &self.inner
655 }
656}
657
658impl<'a> Default for LocalExecutor<'a> {
659 fn default() -> LocalExecutor<'a> {
660 LocalExecutor::new()
661 }
662}
663
664/// The state of a executor.
665struct State {
666 /// The global queue.
667 queue: ConcurrentQueue<Runnable>,
668
669 /// Local queues created by runners.
670 local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
671
672 /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
673 notified: AtomicBool,
674
675 /// A list of sleeping tickers.
676 sleepers: Mutex<Sleepers>,
677
678 /// Currently active tasks.
679 active: Mutex<Slab<Waker>>,
680}
681
682impl State {
683 /// Creates state for a new executor.
684 const fn new() -> State {
685 State {
686 queue: ConcurrentQueue::unbounded(),
687 local_queues: RwLock::new(Vec::new()),
688 notified: AtomicBool::new(true),
689 sleepers: Mutex::new(Sleepers {
690 count: 0,
691 wakers: Vec::new(),
692 free_ids: Vec::new(),
693 }),
694 active: Mutex::new(Slab::new()),
695 }
696 }
697
698 /// Returns a reference to currently active tasks.
699 fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
700 self.active.lock().unwrap_or_else(PoisonError::into_inner)
701 }
702
703 /// Notifies a sleeping ticker.
704 #[inline]
705 fn notify(&self) {
706 if self
707 .notified
708 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
709 .is_ok()
710 {
711 let waker = self
712 .sleepers
713 .lock()
714 .unwrap_or_else(PoisonError::into_inner)
715 .notify();
716 if let Some(w) = waker {
717 w.wake();
718 }
719 }
720 }
721
722 pub(crate) fn try_tick(&self) -> bool {
723 match self.queue.pop() {
724 Err(_) => false,
725 Ok(runnable) => {
726 // Notify another ticker now to pick up where this ticker left off, just in case
727 // running the task takes a long time.
728 self.notify();
729
730 // Run the task.
731 runnable.run();
732 true
733 }
734 }
735 }
736
737 pub(crate) async fn tick(&self) {
738 let runnable = Ticker::new(self).runnable().await;
739 runnable.run();
740 }
741
742 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
743 let mut runner = Runner::new(self);
744 let mut rng = fastrand::Rng::new();
745
746 // A future that runs tasks forever.
747 let run_forever = async {
748 loop {
749 for _ in 0..200 {
750 let runnable = runner.runnable(&mut rng).await;
751 runnable.run();
752 }
753 future::yield_now().await;
754 }
755 };
756
757 // Run `future` and `run_forever` concurrently until `future` completes.
758 future.or(run_forever).await
759 }
760}
761
762/// A list of sleeping tickers.
763struct Sleepers {
764 /// Number of sleeping tickers (both notified and unnotified).
765 count: usize,
766
767 /// IDs and wakers of sleeping unnotified tickers.
768 ///
769 /// A sleeping ticker is notified when its waker is missing from this list.
770 wakers: Vec<(usize, Waker)>,
771
772 /// Reclaimed IDs.
773 free_ids: Vec<usize>,
774}
775
776impl Sleepers {
777 /// Inserts a new sleeping ticker.
778 fn insert(&mut self, waker: &Waker) -> usize {
779 let id = match self.free_ids.pop() {
780 Some(id) => id,
781 None => self.count + 1,
782 };
783 self.count += 1;
784 self.wakers.push((id, waker.clone()));
785 id
786 }
787
788 /// Re-inserts a sleeping ticker's waker if it was notified.
789 ///
790 /// Returns `true` if the ticker was notified.
791 fn update(&mut self, id: usize, waker: &Waker) -> bool {
792 for item in &mut self.wakers {
793 if item.0 == id {
794 item.1.clone_from(waker);
795 return false;
796 }
797 }
798
799 self.wakers.push((id, waker.clone()));
800 true
801 }
802
803 /// Removes a previously inserted sleeping ticker.
804 ///
805 /// Returns `true` if the ticker was notified.
806 fn remove(&mut self, id: usize) -> bool {
807 self.count -= 1;
808 self.free_ids.push(id);
809
810 for i in (0..self.wakers.len()).rev() {
811 if self.wakers[i].0 == id {
812 self.wakers.remove(i);
813 return false;
814 }
815 }
816 true
817 }
818
819 /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
820 fn is_notified(&self) -> bool {
821 self.count == 0 || self.count > self.wakers.len()
822 }
823
824 /// Returns notification waker for a sleeping ticker.
825 ///
826 /// If a ticker was notified already or there are no tickers, `None` will be returned.
827 fn notify(&mut self) -> Option<Waker> {
828 if self.wakers.len() == self.count {
829 self.wakers.pop().map(|item| item.1)
830 } else {
831 None
832 }
833 }
834}
835
836/// Runs task one by one.
837struct Ticker<'a> {
838 /// The executor state.
839 state: &'a State,
840
841 /// Set to a non-zero sleeper ID when in sleeping state.
842 ///
843 /// States a ticker can be in:
844 /// 1) Woken.
845 /// 2a) Sleeping and unnotified.
846 /// 2b) Sleeping and notified.
847 sleeping: usize,
848}
849
850impl Ticker<'_> {
851 /// Creates a ticker.
852 fn new(state: &State) -> Ticker<'_> {
853 Ticker { state, sleeping: 0 }
854 }
855
856 /// Moves the ticker into sleeping and unnotified state.
857 ///
858 /// Returns `false` if the ticker was already sleeping and unnotified.
859 fn sleep(&mut self, waker: &Waker) -> bool {
860 let mut sleepers = self
861 .state
862 .sleepers
863 .lock()
864 .unwrap_or_else(PoisonError::into_inner);
865
866 match self.sleeping {
867 // Move to sleeping state.
868 0 => {
869 self.sleeping = sleepers.insert(waker);
870 }
871
872 // Already sleeping, check if notified.
873 id => {
874 if !sleepers.update(id, waker) {
875 return false;
876 }
877 }
878 }
879
880 self.state
881 .notified
882 .store(sleepers.is_notified(), Ordering::Release);
883
884 true
885 }
886
887 /// Moves the ticker into woken state.
888 fn wake(&mut self) {
889 if self.sleeping != 0 {
890 let mut sleepers = self
891 .state
892 .sleepers
893 .lock()
894 .unwrap_or_else(PoisonError::into_inner);
895 sleepers.remove(self.sleeping);
896
897 self.state
898 .notified
899 .store(sleepers.is_notified(), Ordering::Release);
900 }
901 self.sleeping = 0;
902 }
903
904 /// Waits for the next runnable task to run.
905 async fn runnable(&mut self) -> Runnable {
906 self.runnable_with(|| self.state.queue.pop().ok()).await
907 }
908
909 /// Waits for the next runnable task to run, given a function that searches for a task.
910 async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
911 future::poll_fn(|cx| {
912 loop {
913 match search() {
914 None => {
915 // Move to sleeping and unnotified state.
916 if !self.sleep(cx.waker()) {
917 // If already sleeping and unnotified, return.
918 return Poll::Pending;
919 }
920 }
921 Some(r) => {
922 // Wake up.
923 self.wake();
924
925 // Notify another ticker now to pick up where this ticker left off, just in
926 // case running the task takes a long time.
927 self.state.notify();
928
929 return Poll::Ready(r);
930 }
931 }
932 }
933 })
934 .await
935 }
936}
937
938impl Drop for Ticker<'_> {
939 fn drop(&mut self) {
940 // If this ticker is in sleeping state, it must be removed from the sleepers list.
941 if self.sleeping != 0 {
942 let mut sleepers = self
943 .state
944 .sleepers
945 .lock()
946 .unwrap_or_else(PoisonError::into_inner);
947 let notified = sleepers.remove(self.sleeping);
948
949 self.state
950 .notified
951 .store(sleepers.is_notified(), Ordering::Release);
952
953 // If this ticker was notified, then notify another ticker.
954 if notified {
955 drop(sleepers);
956 self.state.notify();
957 }
958 }
959 }
960}
961
962/// A worker in a work-stealing executor.
963///
964/// This is just a ticker that also has an associated local queue for improved cache locality.
965struct Runner<'a> {
966 /// The executor state.
967 state: &'a State,
968
969 /// Inner ticker.
970 ticker: Ticker<'a>,
971
972 /// The local queue.
973 local: Arc<ConcurrentQueue<Runnable>>,
974
975 /// Bumped every time a runnable task is found.
976 ticks: usize,
977}
978
979impl Runner<'_> {
980 /// Creates a runner and registers it in the executor state.
981 fn new(state: &State) -> Runner<'_> {
982 let runner = Runner {
983 state,
984 ticker: Ticker::new(state),
985 local: Arc::new(ConcurrentQueue::bounded(512)),
986 ticks: 0,
987 };
988 state
989 .local_queues
990 .write()
991 .unwrap_or_else(PoisonError::into_inner)
992 .push(runner.local.clone());
993 runner
994 }
995
996 /// Waits for the next runnable task to run.
997 async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
998 let runnable = self
999 .ticker
1000 .runnable_with(|| {
1001 // Try the local queue.
1002 if let Ok(r) = self.local.pop() {
1003 return Some(r);
1004 }
1005
1006 // Try stealing from the global queue.
1007 if let Ok(r) = self.state.queue.pop() {
1008 steal(&self.state.queue, &self.local);
1009 return Some(r);
1010 }
1011
1012 // Try stealing from other runners.
1013 if let Ok(local_queues) = self.state.local_queues.try_read() {
1014 // Pick a random starting point in the iterator list and rotate the list.
1015 let n = local_queues.len();
1016 let start = rng.usize(..n);
1017 let iter = local_queues
1018 .iter()
1019 .chain(local_queues.iter())
1020 .skip(start)
1021 .take(n);
1022
1023 // Remove this runner's local queue.
1024 let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1025
1026 // Try stealing from each local queue in the list.
1027 for local in iter {
1028 steal(local, &self.local);
1029 if let Ok(r) = self.local.pop() {
1030 return Some(r);
1031 }
1032 }
1033 }
1034
1035 None
1036 })
1037 .await;
1038
1039 // Bump the tick counter.
1040 self.ticks = self.ticks.wrapping_add(1);
1041
1042 if self.ticks % 64 == 0 {
1043 // Steal tasks from the global queue to ensure fair task scheduling.
1044 steal(&self.state.queue, &self.local);
1045 }
1046
1047 runnable
1048 }
1049}
1050
1051impl Drop for Runner<'_> {
1052 fn drop(&mut self) {
1053 // Remove the local queue.
1054 self.state
1055 .local_queues
1056 .write()
1057 .unwrap_or_else(PoisonError::into_inner)
1058 .retain(|local| !Arc::ptr_eq(local, &self.local));
1059
1060 // Re-schedule remaining tasks in the local queue.
1061 while let Ok(r) = self.local.pop() {
1062 r.schedule();
1063 }
1064 }
1065}
1066
1067/// Steals some items from one queue into another.
1068fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
1069 // Half of `src`'s length rounded up.
1070 let mut count = (src.len() + 1) / 2;
1071
1072 if count > 0 {
1073 // Don't steal more than fits into the queue.
1074 if let Some(cap) = dest.capacity() {
1075 count = count.min(cap - dest.len());
1076 }
1077
1078 // Steal tasks.
1079 for _ in 0..count {
1080 if let Ok(t) = src.pop() {
1081 assert!(dest.push(t).is_ok());
1082 } else {
1083 break;
1084 }
1085 }
1086 }
1087}
1088
1089/// Debug implementation for `Executor` and `LocalExecutor`.
1090fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1091 // Get a reference to the state.
1092 let ptr = executor.state.load(Ordering::Acquire);
1093 if ptr.is_null() {
1094 // The executor has not been initialized.
1095 struct Uninitialized;
1096
1097 impl fmt::Debug for Uninitialized {
1098 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1099 f.write_str("<uninitialized>")
1100 }
1101 }
1102
1103 return f.debug_tuple(name).field(&Uninitialized).finish();
1104 }
1105
1106 // SAFETY: If the state pointer is not null, it must have been
1107 // allocated properly by Arc::new and converted via Arc::into_raw
1108 // in state_ptr.
1109 let state = unsafe { &*ptr };
1110
1111 debug_state(state, name, f)
1112}
1113
1114/// Debug implementation for `Executor` and `LocalExecutor`.
1115fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1116 /// Debug wrapper for the number of active tasks.
1117 struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
1118
1119 impl fmt::Debug for ActiveTasks<'_> {
1120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1121 match self.0.try_lock() {
1122 Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
1123 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1124 Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f),
1125 }
1126 }
1127 }
1128
1129 /// Debug wrapper for the local runners.
1130 struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
1131
1132 impl fmt::Debug for LocalRunners<'_> {
1133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1134 match self.0.try_read() {
1135 Ok(lock) => f
1136 .debug_list()
1137 .entries(lock.iter().map(|queue| queue.len()))
1138 .finish(),
1139 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1140 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1141 }
1142 }
1143 }
1144
1145 /// Debug wrapper for the sleepers.
1146 struct SleepCount<'a>(&'a Mutex<Sleepers>);
1147
1148 impl fmt::Debug for SleepCount<'_> {
1149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1150 match self.0.try_lock() {
1151 Ok(lock) => fmt::Debug::fmt(&lock.count, f),
1152 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1153 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1154 }
1155 }
1156 }
1157
1158 f.debug_struct(name)
1159 .field("active", &ActiveTasks(&state.active))
1160 .field("global_tasks", &state.queue.len())
1161 .field("local_runners", &LocalRunners(&state.local_queues))
1162 .field("sleepers", &SleepCount(&state.sleepers))
1163 .finish()
1164}
1165
1166/// Runs a closure when dropped.
1167struct CallOnDrop<F: FnMut()>(F);
1168
1169impl<F: FnMut()> Drop for CallOnDrop<F> {
1170 fn drop(&mut self) {
1171 (self.0)();
1172 }
1173}
1174
1175pin_project! {
1176 /// A wrapper around a future, running a closure when dropped.
1177 struct AsyncCallOnDrop<Fut, Cleanup: FnMut()> {
1178 #[pin]
1179 future: Fut,
1180 cleanup: CallOnDrop<Cleanup>,
1181 }
1182}
1183
1184impl<Fut, Cleanup: FnMut()> AsyncCallOnDrop<Fut, Cleanup> {
1185 fn new(future: Fut, cleanup: Cleanup) -> Self {
1186 Self {
1187 future,
1188 cleanup: CallOnDrop(cleanup),
1189 }
1190 }
1191}
1192
1193impl<Fut: Future, Cleanup: FnMut()> Future for AsyncCallOnDrop<Fut, Cleanup> {
1194 type Output = Fut::Output;
1195
1196 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1197 self.project().future.poll(cx)
1198 }
1199}
1200
1201fn _ensure_send_and_sync() {
1202 use futures_lite::future::pending;
1203
1204 fn is_send<T: Send>(_: T) {}
1205 fn is_sync<T: Sync>(_: T) {}
1206 fn is_static<T: 'static>(_: T) {}
1207
1208 is_send::<Executor<'_>>(Executor::new());
1209 is_sync::<Executor<'_>>(Executor::new());
1210
1211 let ex = Executor::new();
1212 is_send(ex.run(pending::<()>()));
1213 is_sync(ex.run(pending::<()>()));
1214 is_send(ex.tick());
1215 is_sync(ex.tick());
1216 is_send(ex.schedule());
1217 is_sync(ex.schedule());
1218 is_static(ex.schedule());
1219
1220 /// ```compile_fail
1221 /// use async_executor::LocalExecutor;
1222 /// use futures_lite::future::pending;
1223 ///
1224 /// fn is_send<T: Send>(_: T) {}
1225 /// fn is_sync<T: Sync>(_: T) {}
1226 ///
1227 /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
1228 /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
1229 ///
1230 /// let ex = LocalExecutor::new();
1231 /// is_send(ex.run(pending::<()>()));
1232 /// is_sync(ex.run(pending::<()>()));
1233 /// is_send(ex.tick());
1234 /// is_sync(ex.tick());
1235 /// ```
1236 fn _negative_test() {}
1237}