local_runtime/
lib.rs

1#![warn(missing_docs)]
2
3//! Thread-local async runtime
4//!
5//! This crate provides an async runtime that runs entirely within the current thread. As such, it
6//! can run futures that are `!Send` and non-`static`. If no future is able to make progress, the
7//! runtime will suspend the current thread until a future is ready to be polled.
8//!
9//! To actually run a future, see [`block_on`] or [`Executor::block_on`], which drives the future
10//! to completion on the current thread.
11//!
12//! In addition, This crate provides [async timers](crate::time) and an [async adapter](Async)
13//! for standard I/O types, similar to
14//! [`async-io`](https://docs.rs/async-io/latest/async_io/index.html).
15//!
16//! # Implementation
17//!
18//! Task wakeups are handled by a thread-local reactor, which keeps track of all I/O events and
19//! timers in the current thread along with their associated wakers. Waiting for the reactor is
20//! done by [`block_on`], without needing a separate thread.
21//!
22//! The implementation of the reactor depends on the platform. On Unix systems, the reactor uses
23//! [`poll`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/poll.html). Currently,
24//! Windows is not supported.
25//!
26//! # Concurrency
27//!
28//! The [`Executor`] can spawn tasks that run concurrently on the same thread. Alternatively, this
29//! crate provides macros such as [`join`] and [`merge_futures`] for concurrent execution.
30//!
31//! # Compatibility
32//!
33//! Unlike other runtimes, `local_runtime` doesn't run the reactor in the background, instead
34//! relying on [`block_on`] to run the reactor while polling the future. Since leaf futures from
35//! this crate, such as [`Async`] and timers, rely on the reactor to wake up, **they can only be
36//! driven by [`block_on`], and are not compatible with other runtimes**.
37//!
38//! # Examples
39//!
40//! Listen for connections on a local port, while concurrently making connections to localhost.
41//! Return with error if any operation fails.
42//!
43//! ```no_run
44//! use std::{net::{TcpStream, TcpListener}, time::Duration, io};
45//! use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
46//! use local_runtime::{io::Async, time::sleep, Executor};
47//!
48//! # fn main() -> std::io::Result<()> {
49//! let ex = Executor::new();
50//! ex.block_on(async {
51//!     let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
52//!     let addr = listener.get_ref().local_addr()?;
53//!
54//!     // Run this task in the background
55//!     let _bg = ex.spawn(async move {
56//!         // Listen for connections on local port
57//!         loop {
58//!             let (mut stream, _) = listener.accept().await?;
59//!             let mut buf = [0u8; 5];
60//!             stream.read_exact(&mut buf).await?;
61//!             assert_eq!(&buf, b"hello");
62//!         }
63//!         Ok::<_, io::Error>(())
64//!     });
65//!
66//!     // Connect to the listener repeatedly with 50us delay
67//!     loop {
68//!         let mut stream = Async::<TcpStream>::connect(addr).await?;
69//!         stream.write_all(b"hello").await?;
70//!         sleep(Duration::from_micros(500)).await;
71//!     }
72//!     Ok::<_, io::Error>(())
73//! })?;
74//! # Ok(())
75//! # }
76//! ```
77
78mod concurrency;
79pub mod io;
80mod reactor;
81#[cfg(test)]
82mod test;
83pub mod time;
84
85use std::{
86    cell::{Cell, RefCell, UnsafeCell},
87    collections::VecDeque,
88    fmt::Debug,
89    future::{poll_fn, Future},
90    num::NonZero,
91    pin::{pin, Pin},
92    rc::Rc,
93    sync::{
94        atomic::{AtomicBool, Ordering},
95        Arc,
96    },
97    task::{Context, Poll, Wake, Waker},
98    thread::{self, ThreadId},
99};
100
101use atomic_waker::AtomicWaker;
102use concurrent_queue::ConcurrentQueue;
103use futures_core::future::LocalBoxFuture;
104use slab::Slab;
105
106#[doc(hidden)]
107pub use concurrency::{JoinFuture, MergeFutureStream, MergeStream};
108pub use io::Async;
109use reactor::{Notifier, REACTOR};
110
111// Option<Id> will be same size as `usize`
112#[repr(transparent)]
113#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Ord, Eq, Hash)]
114struct Id(NonZero<usize>);
115
116impl Id {
117    const fn new(n: usize) -> Self {
118        Id(NonZero::new(n).expect("expected non-zero ID"))
119    }
120
121    const fn overflowing_incr(&self) -> Self {
122        // Wrap back around to 1 on overflow
123        match self.0.checked_add(1) {
124            Some(next) => Self(next),
125            None => const { Id::new(1) },
126        }
127    }
128}
129
130impl Wake for Notifier {
131    fn wake(self: Arc<Self>) {
132        let _ = self.notify();
133    }
134}
135
136/// Drives a future to completion on the current thread, processing I/O events when idle.
137///
138/// Does not support task spawning (see [`Executor::run`]).
139///
140/// # Example
141///
142/// ```
143/// use std::time::Duration;
144/// use local_runtime::time::Timer;
145///
146/// local_runtime::block_on(async {
147///     Timer::delay(Duration::from_millis(10)).await;
148/// });
149/// ```
150pub fn block_on<T, F>(mut fut: F) -> T
151where
152    F: Future<Output = T>,
153{
154    let mut fut = pin!(fut);
155    let waker = REACTOR.with(|r| r.notifier()).into();
156
157    loop {
158        if let Poll::Ready(out) = fut.as_mut().poll(&mut Context::from_waker(&waker)) {
159            return out;
160        }
161
162        let wait_res = REACTOR.with(|r| r.wait());
163        if let Err(err) = wait_res {
164            log::error!(
165                "{:?} Error polling reactor: {err}",
166                std::thread::current().id()
167            );
168        }
169    }
170}
171
172#[derive(Debug)]
173struct WakeQueue {
174    base_waker: AtomicWaker,
175    local_thread: ThreadId,
176    local: UnsafeCell<VecDeque<usize>>,
177    concurrent: ConcurrentQueue<usize>,
178}
179
180// SAFETY: The thread-unsafety comes from `local`, which will only be accessed if the current
181// thread equals `local_thread`. As such, `local` will never be accessed from multiple threads.
182unsafe impl Send for WakeQueue {}
183unsafe impl Sync for WakeQueue {}
184
185impl WakeQueue {
186    fn with_capacity(capacity: usize) -> Self {
187        Self {
188            base_waker: AtomicWaker::new(),
189            local_thread: thread::current().id(),
190            local: UnsafeCell::new(VecDeque::with_capacity(capacity)),
191            concurrent: ConcurrentQueue::unbounded(),
192        }
193    }
194
195    fn push(&self, val: usize) {
196        if thread::current().id() == self.local_thread {
197            // SAFETY: Like all other accesses to `local`, this access can only happen if current
198            // thread is `local_thread`, and also has limited lifetime. As such, this access will
199            // never cause a data race or collide with any other access of `local`.
200            unsafe { (*self.local.get()).push_back(val) };
201        } else {
202            // If queue is closed, then just don't do anything
203            let _ = self.concurrent.push(val);
204        }
205    }
206
207    fn drain_for_each<F: FnMut(usize)>(&self, mut f: F) {
208        if thread::current().id() == self.local_thread {
209            // SAFETY: Like all other accesses to `local`, this access can only happen if current
210            // thread is `local_thread`, and also has limited lifetime. As such, this access will
211            // never cause a data race or collide with any other access of `local`.
212            let local_len = unsafe { (*self.local.get()).len() };
213            let con_len = self.concurrent.len();
214
215            log::trace!(
216                "{:?} {local_len} local wakeups, {con_len} concurrent wakeups",
217                std::thread::current().id()
218            );
219
220            // Set upperbounds for the iteration on the two queues to ensure we never loop forever
221            // if the callback also adds values to the queue
222            for _ in 0..local_len {
223                let val = unsafe { (*self.local.get()).pop_front().unwrap() };
224                f(val);
225            }
226            for val in self.concurrent.try_iter().take(con_len) {
227                f(val);
228            }
229        }
230    }
231
232    fn reset(&self, init_val: usize) {
233        if thread::current().id() == self.local_thread {
234            // SAFETY: Like all other accesses to `local`, this access can only happen if current
235            // thread is `local_thread`, and also has limited lifetime. As such, this access will
236            // never cause a data race or collide with any other access of `local`.
237            unsafe {
238                (*self.local.get()).clear();
239                (*self.local.get()).push_back(init_val);
240            }
241            // Pop all remaining elements
242            while self.concurrent.pop().is_ok() {}
243        }
244    }
245}
246
247struct TaskWaker {
248    queue: Arc<WakeQueue>,
249    awoken: AtomicBool,
250    task_id: usize,
251}
252
253impl Wake for TaskWaker {
254    fn wake(self: Arc<Self>) {
255        // Ensure that we only push the task ID to the queue once per wakeup
256        // Use relaxed memory ordering here AtomicWaker already provides strict memory ordering
257        if self
258            .awoken
259            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
260            .is_ok()
261        {
262            self.queue.push(self.task_id);
263            // Release memory ordering
264            self.queue.base_waker.wake();
265        }
266    }
267}
268
269impl TaskWaker {
270    fn new(queue: Arc<WakeQueue>, task_id: usize) -> Self {
271        Self {
272            awoken: AtomicBool::new(false),
273            queue,
274            task_id,
275        }
276    }
277
278    fn waker_pair(queue: Arc<WakeQueue>, task_id: usize) -> (Arc<Self>, Waker) {
279        let this = Arc::new(Self::new(queue, task_id));
280        let waker = this.clone().into();
281        (this, waker)
282    }
283
284    fn to_sleep(&self) {
285        self.awoken.store(false, Ordering::Relaxed);
286    }
287}
288
289struct SpawnedTask<'a> {
290    future: LocalBoxFuture<'a, ()>,
291    handle_data: Rc<HandleData>,
292}
293
294struct Task<'a> {
295    future: LocalBoxFuture<'a, ()>,
296    handle_data: Rc<HandleData>,
297    waker_pair: (Arc<TaskWaker>, Waker),
298}
299
300impl<'a> Task<'a> {
301    fn poll(&mut self) -> Poll<()> {
302        let (waker_data, waker) = &self.waker_pair;
303        // Reset this waker so that it can produce wakeups again
304        waker_data.to_sleep();
305        self.future.as_mut().poll(&mut Context::from_waker(waker))
306    }
307
308    fn from_spawned(spawned_task: SpawnedTask<'a>, waker_pair: (Arc<TaskWaker>, Waker)) -> Self {
309        let handle_data = spawned_task.handle_data;
310        // Set up waker for the handle
311        handle_data.waker.set(Some(waker_pair.1.clone()));
312        Self {
313            future: spawned_task.future,
314            handle_data,
315            waker_pair,
316        }
317    }
318}
319
320/// An async executor that can spawn tasks
321///
322/// # Example
323///
324/// Run a future that spawns tasks and captures the outside environment.
325///
326/// ```
327/// use local_runtime::{block_on, Executor};
328///
329/// // Run future on current thread
330/// block_on(async {
331///     let n = 10;
332///     let ex = Executor::new();
333///     let out = ex.run(async {
334///         // Spawn an async task that captures from the outside environment
335///         let handle = ex.spawn(async { &n });
336///         // Wait for the task to complete
337///         handle.await
338///     }).await;
339///     assert_eq!(*out, 10);
340/// });
341/// ```
342pub struct Executor<'a> {
343    tasks: RefCell<Slab<Task<'a>>>,
344    spawned: RefCell<Vec<SpawnedTask<'a>>>,
345    wake_queue: Arc<WakeQueue>,
346}
347
348impl Default for Executor<'_> {
349    fn default() -> Self {
350        Self::new()
351    }
352}
353
354const MAIN_TASK_ID: usize = usize::MAX;
355
356impl<'a> Executor<'a> {
357    /// Create new executor
358    pub fn new() -> Self {
359        Self::with_capacity(4)
360    }
361
362    /// Create new executor with a pre-allocated capacity
363    ///
364    /// The executor will be able to hold at least `capacity` concurrent tasks without reallocating
365    /// its internal storage.
366    pub fn with_capacity(capacity: usize) -> Self {
367        Self {
368            tasks: RefCell::new(Slab::with_capacity(capacity)),
369            spawned: RefCell::new(Vec::with_capacity(capacity)),
370            wake_queue: Arc::new(WakeQueue::with_capacity(capacity)),
371        }
372    }
373
374    /// Spawn a task on the executor, returning a [`TaskHandle`] to it
375    ///
376    /// The provided future will run concurrently on the current thread while [`Executor::run`]
377    /// runs, even if you don't await on the `TaskHandle`. If it's not awaited, there's no
378    /// guarantee that the task will run to completion.
379    ///
380    /// To spawn additional tasks from inside of a spawned task, see [`Executor::spawn_rc`].
381    ///
382    /// ```no_run
383    /// use std::net::TcpListener;
384    /// use local_runtime::{io::Async, Executor, block_on};
385    ///
386    /// # fn main() -> std::io::Result<()> {
387    /// let ex = Executor::new();
388    /// block_on(ex.run(async {
389    ///     let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8080))?;
390    ///     loop {
391    ///         let mut stream = listener.accept().await?;
392    ///         let task = ex.spawn(async move {
393    ///             // Process each connection concurrently
394    ///         });
395    ///     }
396    ///     Ok(())
397    /// }))
398    /// # }
399    /// ```
400    pub fn spawn<T: 'a>(&self, fut: impl Future<Output = T> + 'a) -> TaskHandle<T> {
401        let ret = Rc::new(RetData {
402            value: Cell::new(None),
403            waker: Cell::new(None),
404        });
405        let ret_clone = ret.clone();
406        let handle_data = Rc::<HandleData>::default();
407
408        let mut spawned = self.spawned.borrow_mut();
409        spawned.push(SpawnedTask {
410            future: Box::pin(async move {
411                let retval = fut.await;
412                let ret = ret_clone;
413                ret.value.set(Some(retval));
414                if let Some(waker) = ret.waker.take() {
415                    waker.wake();
416                }
417            }),
418            handle_data: handle_data.clone(),
419        });
420        TaskHandle { ret, handle_data }
421    }
422
423    /// Spawn a task using a [`Rc`] pointer to the executor, rather than a reference. This allows
424    /// for spawning more tasks inside spawned tasks.
425    ///
426    /// When attempting "recursive" task spawning using [`Executor::spawn`], you will encounter
427    /// borrow checker errors about the lifetime of the executor:
428    ///
429    /// ```compile_fail
430    /// use local_runtime::Executor;
431    ///
432    /// let ex = Executor::new();
433    /// //  -- binding `ex` declared here
434    /// ex.block_on(async {
435    ///     //      ----- value captured here by coroutine
436    ///     let outer_task = ex.spawn(async {
437    ///     //               ^^ borrowed value does not live long enough
438    ///         let inner_task = ex.spawn(async { 10 });
439    ///         inner_task.await;
440    ///     });
441    /// });
442    /// // -
443    /// // |
444    /// // `ex` dropped here while still borrowed
445    /// // borrow might be used here, when `ex` is dropped and runs the destructor for type `Executor<'_>`
446    /// ```
447    ///
448    /// This happens because the future associated with the task is stored in the executor. So if
449    /// `outer_task` contains a reference to the executor, then the executor will be storing a
450    /// reference to itself, which is not allowed. To circumvent this issue, we need to put the
451    /// executor behind a [`Rc`] pointer and clone it into every task that we want to spawn more
452    /// tasks in. This is where [`Executor::spawn_rc`] comes in.
453    ///
454    /// Rather than taking a future, `spawn_rc` accepts a closure that takes a `Rc` to executor
455    /// and returns a future. This allows the future to capture the executor by value rather than
456    /// by reference, getting rid of the borrow error.
457    ///
458    /// # Example
459    ///
460    /// ```no_run
461    /// use std::rc::Rc;
462    /// use local_runtime::Executor;
463    ///
464    /// let ex = Rc::new(Executor::new());
465    /// ex.block_on(async {
466    ///     let outer_task = ex.clone().spawn_rc(|ex| async move {
467    ///         let inner_task = ex.spawn(async { 10 });
468    ///         inner_task.await;
469    ///     });
470    /// });
471    /// ```
472    pub fn spawn_rc<T: 'a, Fut: Future<Output = T> + 'a, F>(self: Rc<Self>, f: F) -> TaskHandle<T>
473    where
474        F: FnOnce(Rc<Self>) -> Fut + 'a,
475    {
476        let cl = self.clone();
477        self.spawn(f(cl))
478    }
479
480    fn register_base_waker(&self, base_waker: &Waker) {
481        // Acquire ordering
482        self.wake_queue.base_waker.register(base_waker);
483    }
484
485    // Poll tasks that have been awoken, returning whether the main future has been awoken
486    fn poll_tasks(&self) -> bool {
487        let mut main_task_awoken = false;
488        let mut tasks = self.tasks.borrow_mut();
489
490        self.wake_queue.drain_for_each(|task_id| {
491            if task_id == MAIN_TASK_ID {
492                main_task_awoken = true;
493            }
494            // For each awoken task, find it if it still exists
495            else if let Some(task) = tasks.get_mut(task_id) {
496                // If a task is cancelled, don't poll it, just remove it
497                if task.handle_data.cancelled.get() || task.poll().is_ready() {
498                    tasks.remove(task_id);
499                }
500            }
501        });
502
503        main_task_awoken
504    }
505
506    // Poll newly spawned tasks and move them to the task list
507    fn poll_spawned(&self) {
508        let mut tasks = self.tasks.borrow_mut();
509        // Keep checking newly spawned tasks until there's no more left.
510        // Reborrow the spawned tasks on every iteration, because the tasks themselves also need to
511        // borrow the spawned tasks.
512        while let Some(spawned_task) = self.spawned.borrow_mut().pop() {
513            // Ignore cancelled tasks
514            if spawned_task.handle_data.cancelled.get() {
515                continue;
516            }
517
518            let next_vacancy = tasks.vacant_entry();
519            // Use the Slab index as the task ID.
520            // If the waker outlives the task or the task calls the waker even after returning
521            // Ready, then it's possible for the waker to wake up another task that's replaced the
522            // original task in the Slab. This should be rare, and at worse causes spurious wakeups.
523            let task_id = next_vacancy.key();
524            assert_ne!(task_id, MAIN_TASK_ID);
525            let waker_pair = TaskWaker::waker_pair(self.wake_queue.clone(), task_id);
526            let mut task = Task::from_spawned(spawned_task, waker_pair);
527            // Only insert the task if it returns pending
528            if task.poll().is_pending() {
529                next_vacancy.insert(task);
530            }
531        }
532    }
533
534    /// Blocking version of [`Executor::run`].
535    ///
536    /// This is just a shorthand for calling `block_on(ex.run(fut))`.
537    ///
538    /// # Panic
539    ///
540    /// Calling this function within a task spawned on the same executor will panic.
541    pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> T {
542        block_on(self.run(fut))
543    }
544
545    /// Drives the future to completion asynchronously while also driving all spawned tasks
546    ///
547    /// When this function completes, it will drop all unfinished tasks that were spawned on the
548    /// executor.
549    ///
550    /// # Panic
551    ///
552    /// Polling the future returned by this function within a task spawned on the same executor will
553    /// panic.
554    ///
555    /// # Example
556    ///
557    /// ```
558    /// use std::net::UdpSocket;
559    /// use std::io;
560    /// use local_runtime::{block_on, Executor, Async};
561    ///
562    /// // Run future on current thread
563    /// block_on(async {
564    ///     let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
565    ///     let addr = socket.get_ref().local_addr()?;
566    ///     socket.connect(addr)?;
567    ///
568    ///     let ex = Executor::new();
569    ///     ex.run(async {
570    ///         let task = ex.spawn(async {
571    ///             socket.send(b"hello").await?;
572    ///             socket.send(b"hello").await?;
573    ///             Ok::<_, io::Error>(())
574    ///         });
575    ///
576    ///         let mut data = [0u8; 5];
577    ///         socket.recv(&mut data).await?;
578    ///         socket.recv(&mut data).await?;
579    ///         task.await
580    ///     }).await
581    /// });
582    /// ```
583    pub async fn run<T>(&self, fut: impl Future<Output = T>) -> T {
584        let mut fut = pin!(fut);
585        // Create waker for main future
586        let (main_waker_data, main_waker) =
587            TaskWaker::waker_pair(self.wake_queue.clone(), MAIN_TASK_ID);
588        self.wake_queue.reset(MAIN_TASK_ID);
589
590        let out = poll_fn(move |cx| {
591            self.register_base_waker(cx.waker());
592            let main_task_awoken = self.poll_tasks();
593            if main_task_awoken {
594                main_waker_data.to_sleep();
595                if let Poll::Ready(out) = fut.as_mut().poll(&mut Context::from_waker(&main_waker)) {
596                    return Poll::Ready(out);
597                }
598            }
599            self.poll_spawned();
600            Poll::Pending
601        })
602        .await;
603
604        // Drop all unfinished tasks so that any Rc<Executor> inside the tasks are dropped. This
605        // prevents Rc-cycles and guarantees that the executor will be dropped later
606        self.tasks.borrow_mut().clear();
607        self.spawned.borrow_mut().clear();
608        out
609    }
610}
611
612struct RetData<T> {
613    value: Cell<Option<T>>,
614    waker: Cell<Option<Waker>>,
615}
616
617#[derive(Default)]
618struct HandleData {
619    cancelled: Cell<bool>,
620    waker: Cell<Option<Waker>>,
621}
622
623/// A handle to a spawned task
624///
625/// A `TaskHandle` can be awaited to wait for the completion of its associated task and get its
626/// result.
627///
628/// A `TaskHandle` detaches its task when dropped. This means the it can no longer be awaited, but
629/// the executor will still poll its task.
630///
631/// This is created by [`Executor::spawn`] and [`Executor::spawn_rc`].
632pub struct TaskHandle<T> {
633    ret: Rc<RetData<T>>,
634    handle_data: Rc<HandleData>,
635}
636
637impl<T> TaskHandle<T> {
638    /// Cancel the task
639    ///
640    /// Deletes the task from the executor so that it won't be polled again.
641    ///
642    /// If the handle is awaited after cancellation, it might still complete if the task was
643    /// already finished before it was cancelled. However, the likelier outcomes is that it never
644    /// completes.
645    pub fn cancel(&self) {
646        self.handle_data.cancelled.set(true);
647        // If the task has a waker, then it has already been added to the task list, so it needs to
648        // be awoken in order for its cancellation status to be checked
649        if let Some(waker) = self.handle_data.waker.take() {
650            waker.wake();
651        }
652    }
653
654    /// Check if this task is finished
655    ///
656    /// If this returns `true`, the next `poll` call is guaranteed to return [`Poll::Ready`].
657    pub fn is_finished(&self) -> bool {
658        // SAFETY: We never get a long-lived reference to ret.value, so aliasing cannot occur
659        unsafe { (*self.ret.value.as_ptr()).is_some() }
660    }
661
662    /// Check if this task has been cancelled
663    pub fn is_cancelled(&self) -> bool {
664        self.handle_data.cancelled.get()
665    }
666}
667
668impl<T> Future for TaskHandle<T> {
669    type Output = T;
670
671    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
672        if let Some(val) = self.ret.value.take() {
673            return Poll::Ready(val);
674        }
675
676        let mut waker = self.ret.waker.take();
677        match &mut waker {
678            Some(waker) => waker.clone_from(cx.waker()),
679            None => waker = Some(cx.waker().clone()),
680        }
681        self.ret.waker.set(waker);
682        Poll::Pending
683    }
684}
685
686#[cfg(test)]
687mod tests {
688    use std::{future::pending, time::Duration};
689
690    use crate::{test::MockWaker, time::sleep};
691
692    use super::*;
693
694    #[test]
695    fn spawn_and_poll() {
696        let ex = Executor::new();
697        assert_eq!(ex.tasks.borrow().len(), 0);
698
699        ex.spawn(pending::<()>());
700        ex.spawn(pending::<()>());
701        ex.spawn(pending::<()>());
702        ex.poll_tasks();
703        ex.poll_spawned();
704        assert_eq!(ex.tasks.borrow().len(), 3);
705
706        ex.spawn(async {});
707        ex.spawn(async {});
708        ex.poll_tasks();
709        ex.poll_spawned();
710        assert_eq!(ex.tasks.borrow().len(), 3);
711    }
712
713    #[test]
714    fn task_waker() {
715        let base_waker = Arc::new(MockWaker::default());
716        let mut n = 0;
717        let ex = Executor::new();
718        ex.register_base_waker(&base_waker.clone().into());
719        ex.spawn(poll_fn(|cx| {
720            n += 1;
721            cx.waker().wake_by_ref();
722            Poll::<()>::Pending
723        }));
724
725        // Poll the spawned tasks, which should wake up right away
726        ex.poll_spawned();
727        assert_eq!(unsafe { (*ex.wake_queue.local.get()).len() }, 1);
728        assert!(base_waker.get());
729        // Poll the awoken task, which should wake up again
730        ex.poll_tasks();
731        assert_eq!(unsafe { (*ex.wake_queue.local.get()).len() }, 1);
732
733        drop(ex);
734        // Should have polled twice
735        assert_eq!(n, 2);
736    }
737
738    #[test]
739    fn cancel() {
740        let ex = Executor::new();
741        assert_eq!(ex.tasks.borrow().len(), 0);
742
743        let task = ex.spawn(pending::<()>());
744        // Cancel task while it's in the spawned list
745        task.cancel();
746        assert!(task.is_cancelled());
747        ex.poll_tasks();
748        ex.poll_spawned();
749        assert_eq!(ex.tasks.borrow().len(), 0);
750
751        let task = ex.spawn(pending::<()>());
752        assert!(!task.is_cancelled());
753        ex.poll_tasks();
754        ex.poll_spawned();
755        assert_eq!(ex.tasks.borrow().len(), 1);
756
757        // Cancel task while it's in the task list
758        task.cancel();
759        ex.poll_tasks();
760        ex.poll_spawned();
761        assert_eq!(ex.tasks.borrow().len(), 0);
762    }
763
764    #[test]
765    fn wake_queue() {
766        let queue = WakeQueue::with_capacity(4);
767        queue.push(12);
768        queue.push(13);
769
770        thread::scope(|s| {
771            let queue = &queue;
772            for i in 0..10 {
773                s.spawn(move || queue.push(i));
774            }
775        });
776
777        assert_eq!(queue.concurrent.len(), 10);
778        assert_eq!(unsafe { (*queue.local.get()).len() }, 2);
779
780        let mut elems = vec![];
781        queue.drain_for_each(|e| elems.push(e));
782        elems.sort_unstable();
783        assert_eq!(elems, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 12, 13]);
784
785        queue.push(12);
786        queue.push(13);
787        queue.reset(6);
788        assert_eq!(queue.concurrent.len(), 0);
789        assert_eq!(unsafe { (*queue.local.get()).len() }, 1);
790        queue.drain_for_each(|e| assert_eq!(e, 6));
791    }
792
793    #[test]
794    fn switch_waker() {
795        let ex = Executor::new();
796        let waker1 = Arc::new(MockWaker::default());
797        let waker2 = Arc::new(MockWaker::default());
798
799        let mut fut = pin!(ex.run(async {
800            let _bg = ex.spawn(sleep(Duration::from_millis(100)));
801            sleep(Duration::from_millis(50)).await;
802            // Have the future wait forever without polling the background task
803            pending::<()>().await;
804        }));
805
806        // Poll future with waker1
807        assert!(fut
808            .as_mut()
809            .poll(&mut Context::from_waker(&waker1.clone().into()))
810            .is_pending());
811        // Wait for the reactor, which should notify waker1
812        REACTOR.with(|r| r.wait()).unwrap();
813        assert!(waker1.get());
814
815        // Poll future with waker2
816        assert!(fut
817            .as_mut()
818            .poll(&mut Context::from_waker(&waker2.clone().into()))
819            .is_pending());
820        // Wait for the reactor, which should notify waker2
821        // even though the sleep task is never polled after switching to waker2
822        REACTOR.with(|r| r.wait()).unwrap();
823        assert!(waker2.get());
824    }
825
826    #[test]
827    fn switch_waker_join() {
828        let waker1 = Arc::new(MockWaker::default());
829        let waker2 = Arc::new(MockWaker::default());
830
831        let mut fut = pin!(join!(
832            sleep(Duration::from_millis(50)),
833            sleep(Duration::from_millis(100))
834        ));
835
836        // Poll future with waker1
837        assert!(fut
838            .as_mut()
839            .poll(&mut Context::from_waker(&waker1.clone().into()))
840            .is_pending());
841        // Wait for the reactor, which should notify waker1
842        REACTOR.with(|r| r.wait()).unwrap();
843        assert!(waker1.get());
844
845        // Poll future with waker2
846        assert!(fut
847            .as_mut()
848            .poll(&mut Context::from_waker(&waker2.clone().into()))
849            .is_pending());
850        // Wait for the reactor, which should notify waker2
851        // even though the sleep task is never polled after switching to waker2
852        REACTOR.with(|r| r.wait()).unwrap();
853        assert!(waker2.get());
854    }
855}