local_runtime/lib.rs
1#![warn(missing_docs)]
2
3//! Thread-local async runtime
4//!
5//! This crate provides an async runtime that runs entirely within the current thread. As such, it
6//! can run futures that are `!Send` and non-`static`. If no future is able to make progress, the
7//! runtime will suspend the current thread until a future is ready to be polled.
8//!
9//! To actually run a future, see [`block_on`] or [`Executor::block_on`], which drives the future
10//! to completion on the current thread.
11//!
12//! In addition, This crate provides [async timers](crate::time) and an [async adapter](Async)
13//! for standard I/O types, similar to
14//! [`async-io`](https://docs.rs/async-io/latest/async_io/index.html).
15//!
16//! # Implementation
17//!
18//! Task wakeups are handled by a thread-local reactor, which keeps track of all I/O events and
19//! timers in the current thread along with their associated wakers. Waiting for the reactor is
20//! done by [`block_on`], without needing a separate thread.
21//!
22//! The implementation of the reactor depends on the platform. On Unix systems, the reactor uses
23//! [`poll`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/poll.html). Currently,
24//! Windows is not supported.
25//!
26//! # Concurrency
27//!
28//! The [`Executor`] can spawn tasks that run concurrently on the same thread. Alternatively, this
29//! crate provides macros such as [`join`] and [`merge_futures`] for concurrent execution.
30//!
31//! # Compatibility
32//!
33//! Unlike other runtimes, `local_runtime` doesn't run the reactor in the background, instead
34//! relying on [`block_on`] to run the reactor while polling the future. Since leaf futures from
35//! this crate, such as [`Async`] and timers, rely on the reactor to wake up, **they can only be
36//! driven by [`block_on`], and are not compatible with other runtimes**.
37//!
38//! # Examples
39//!
40//! Listen for connections on a local port, while concurrently making connections to localhost.
41//! Return with error if any operation fails.
42//!
43//! ```no_run
44//! use std::{net::{TcpStream, TcpListener}, time::Duration, io};
45//! use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
46//! use local_runtime::{io::Async, time::sleep, Executor};
47//!
48//! # fn main() -> std::io::Result<()> {
49//! let ex = Executor::new();
50//! ex.block_on(async {
51//! let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
52//! let addr = listener.get_ref().local_addr()?;
53//!
54//! // Run this task in the background
55//! let _bg = ex.spawn(async move {
56//! // Listen for connections on local port
57//! loop {
58//! let (mut stream, _) = listener.accept().await?;
59//! let mut buf = [0u8; 5];
60//! stream.read_exact(&mut buf).await?;
61//! assert_eq!(&buf, b"hello");
62//! }
63//! Ok::<_, io::Error>(())
64//! });
65//!
66//! // Connect to the listener repeatedly with 50us delay
67//! loop {
68//! let mut stream = Async::<TcpStream>::connect(addr).await?;
69//! stream.write_all(b"hello").await?;
70//! sleep(Duration::from_micros(500)).await;
71//! }
72//! Ok::<_, io::Error>(())
73//! })?;
74//! # Ok(())
75//! # }
76//! ```
77
78mod concurrency;
79pub mod io;
80mod reactor;
81#[cfg(test)]
82mod test;
83pub mod time;
84
85use std::{
86 cell::{Cell, RefCell, UnsafeCell},
87 collections::VecDeque,
88 fmt::Debug,
89 future::{poll_fn, Future},
90 num::NonZero,
91 pin::{pin, Pin},
92 rc::Rc,
93 sync::{
94 atomic::{AtomicBool, Ordering},
95 Arc,
96 },
97 task::{Context, Poll, Wake, Waker},
98 thread::{self, ThreadId},
99};
100
101use atomic_waker::AtomicWaker;
102use concurrent_queue::ConcurrentQueue;
103use futures_core::future::LocalBoxFuture;
104use slab::Slab;
105
106#[doc(hidden)]
107pub use concurrency::{JoinFuture, MergeFutureStream, MergeStream};
108pub use io::Async;
109use reactor::{Notifier, REACTOR};
110
111// Option<Id> will be same size as `usize`
112#[repr(transparent)]
113#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Ord, Eq, Hash)]
114struct Id(NonZero<usize>);
115
116impl Id {
117 const fn new(n: usize) -> Self {
118 Id(NonZero::new(n).expect("expected non-zero ID"))
119 }
120
121 const fn overflowing_incr(&self) -> Self {
122 // Wrap back around to 1 on overflow
123 match self.0.checked_add(1) {
124 Some(next) => Self(next),
125 None => const { Id::new(1) },
126 }
127 }
128}
129
130impl Wake for Notifier {
131 fn wake(self: Arc<Self>) {
132 let _ = self.notify();
133 }
134}
135
136/// Drives a future to completion on the current thread, processing I/O events when idle.
137///
138/// Does not support task spawning (see [`Executor::run`]).
139///
140/// # Example
141///
142/// ```
143/// use std::time::Duration;
144/// use local_runtime::time::Timer;
145///
146/// local_runtime::block_on(async {
147/// Timer::delay(Duration::from_millis(10)).await;
148/// });
149/// ```
150pub fn block_on<T, F>(mut fut: F) -> T
151where
152 F: Future<Output = T>,
153{
154 let mut fut = pin!(fut);
155 let waker = REACTOR.with(|r| r.notifier()).into();
156
157 loop {
158 if let Poll::Ready(out) = fut.as_mut().poll(&mut Context::from_waker(&waker)) {
159 return out;
160 }
161
162 let wait_res = REACTOR.with(|r| r.wait());
163 if let Err(err) = wait_res {
164 log::error!(
165 "{:?} Error polling reactor: {err}",
166 std::thread::current().id()
167 );
168 }
169 }
170}
171
172#[derive(Debug)]
173struct WakeQueue {
174 base_waker: AtomicWaker,
175 local_thread: ThreadId,
176 local: UnsafeCell<VecDeque<usize>>,
177 concurrent: ConcurrentQueue<usize>,
178}
179
180// SAFETY: The thread-unsafety comes from `local`, which will only be accessed if the current
181// thread equals `local_thread`. As such, `local` will never be accessed from multiple threads.
182unsafe impl Send for WakeQueue {}
183unsafe impl Sync for WakeQueue {}
184
185impl WakeQueue {
186 fn with_capacity(capacity: usize) -> Self {
187 Self {
188 base_waker: AtomicWaker::new(),
189 local_thread: thread::current().id(),
190 local: UnsafeCell::new(VecDeque::with_capacity(capacity)),
191 concurrent: ConcurrentQueue::unbounded(),
192 }
193 }
194
195 fn push(&self, val: usize) {
196 if thread::current().id() == self.local_thread {
197 // SAFETY: Like all other accesses to `local`, this access can only happen if current
198 // thread is `local_thread`, and also has limited lifetime. As such, this access will
199 // never cause a data race or collide with any other access of `local`.
200 unsafe { (*self.local.get()).push_back(val) };
201 } else {
202 // If queue is closed, then just don't do anything
203 let _ = self.concurrent.push(val);
204 }
205 }
206
207 fn drain_for_each<F: FnMut(usize)>(&self, mut f: F) {
208 if thread::current().id() == self.local_thread {
209 // SAFETY: Like all other accesses to `local`, this access can only happen if current
210 // thread is `local_thread`, and also has limited lifetime. As such, this access will
211 // never cause a data race or collide with any other access of `local`.
212 let local_len = unsafe { (*self.local.get()).len() };
213 let con_len = self.concurrent.len();
214
215 log::trace!(
216 "{:?} {local_len} local wakeups, {con_len} concurrent wakeups",
217 std::thread::current().id()
218 );
219
220 // Set upperbounds for the iteration on the two queues to ensure we never loop forever
221 // if the callback also adds values to the queue
222 for _ in 0..local_len {
223 let val = unsafe { (*self.local.get()).pop_front().unwrap() };
224 f(val);
225 }
226 for val in self.concurrent.try_iter().take(con_len) {
227 f(val);
228 }
229 }
230 }
231
232 fn reset(&self, init_val: usize) {
233 if thread::current().id() == self.local_thread {
234 // SAFETY: Like all other accesses to `local`, this access can only happen if current
235 // thread is `local_thread`, and also has limited lifetime. As such, this access will
236 // never cause a data race or collide with any other access of `local`.
237 unsafe {
238 (*self.local.get()).clear();
239 (*self.local.get()).push_back(init_val);
240 }
241 // Pop all remaining elements
242 while self.concurrent.pop().is_ok() {}
243 }
244 }
245}
246
247struct TaskWaker {
248 queue: Arc<WakeQueue>,
249 awoken: AtomicBool,
250 task_id: usize,
251}
252
253impl Wake for TaskWaker {
254 fn wake(self: Arc<Self>) {
255 // Ensure that we only push the task ID to the queue once per wakeup
256 // Use relaxed memory ordering here AtomicWaker already provides strict memory ordering
257 if self
258 .awoken
259 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
260 .is_ok()
261 {
262 self.queue.push(self.task_id);
263 // Release memory ordering
264 self.queue.base_waker.wake();
265 }
266 }
267}
268
269impl TaskWaker {
270 fn new(queue: Arc<WakeQueue>, task_id: usize) -> Self {
271 Self {
272 awoken: AtomicBool::new(false),
273 queue,
274 task_id,
275 }
276 }
277
278 fn waker_pair(queue: Arc<WakeQueue>, task_id: usize) -> (Arc<Self>, Waker) {
279 let this = Arc::new(Self::new(queue, task_id));
280 let waker = this.clone().into();
281 (this, waker)
282 }
283
284 fn to_sleep(&self) {
285 self.awoken.store(false, Ordering::Relaxed);
286 }
287}
288
289struct SpawnedTask<'a> {
290 future: LocalBoxFuture<'a, ()>,
291 handle_data: Rc<HandleData>,
292}
293
294struct Task<'a> {
295 future: LocalBoxFuture<'a, ()>,
296 handle_data: Rc<HandleData>,
297 waker_pair: (Arc<TaskWaker>, Waker),
298}
299
300impl<'a> Task<'a> {
301 fn poll(&mut self) -> Poll<()> {
302 let (waker_data, waker) = &self.waker_pair;
303 // Reset this waker so that it can produce wakeups again
304 waker_data.to_sleep();
305 self.future.as_mut().poll(&mut Context::from_waker(waker))
306 }
307
308 fn from_spawned(spawned_task: SpawnedTask<'a>, waker_pair: (Arc<TaskWaker>, Waker)) -> Self {
309 let handle_data = spawned_task.handle_data;
310 // Set up waker for the handle
311 handle_data.waker.set(Some(waker_pair.1.clone()));
312 Self {
313 future: spawned_task.future,
314 handle_data,
315 waker_pair,
316 }
317 }
318}
319
320/// An async executor that can spawn tasks
321///
322/// # Example
323///
324/// Run a future that spawns tasks and captures the outside environment.
325///
326/// ```
327/// use local_runtime::{block_on, Executor};
328///
329/// // Run future on current thread
330/// block_on(async {
331/// let n = 10;
332/// let ex = Executor::new();
333/// let out = ex.run(async {
334/// // Spawn an async task that captures from the outside environment
335/// let handle = ex.spawn(async { &n });
336/// // Wait for the task to complete
337/// handle.await
338/// }).await;
339/// assert_eq!(*out, 10);
340/// });
341/// ```
342pub struct Executor<'a> {
343 tasks: RefCell<Slab<Task<'a>>>,
344 spawned: RefCell<Vec<SpawnedTask<'a>>>,
345 wake_queue: Arc<WakeQueue>,
346}
347
348impl Default for Executor<'_> {
349 fn default() -> Self {
350 Self::new()
351 }
352}
353
354const MAIN_TASK_ID: usize = usize::MAX;
355
356impl<'a> Executor<'a> {
357 /// Create new executor
358 pub fn new() -> Self {
359 Self::with_capacity(4)
360 }
361
362 /// Create new executor with a pre-allocated capacity
363 ///
364 /// The executor will be able to hold at least `capacity` concurrent tasks without reallocating
365 /// its internal storage.
366 pub fn with_capacity(capacity: usize) -> Self {
367 Self {
368 tasks: RefCell::new(Slab::with_capacity(capacity)),
369 spawned: RefCell::new(Vec::with_capacity(capacity)),
370 wake_queue: Arc::new(WakeQueue::with_capacity(capacity)),
371 }
372 }
373
374 /// Spawn a task on the executor, returning a [`TaskHandle`] to it
375 ///
376 /// The provided future will run concurrently on the current thread while [`Executor::run`]
377 /// runs, even if you don't await on the `TaskHandle`. If it's not awaited, there's no
378 /// guarantee that the task will run to completion.
379 ///
380 /// To spawn additional tasks from inside of a spawned task, see [`Executor::spawn_rc`].
381 ///
382 /// ```no_run
383 /// use std::net::TcpListener;
384 /// use local_runtime::{io::Async, Executor, block_on};
385 ///
386 /// # fn main() -> std::io::Result<()> {
387 /// let ex = Executor::new();
388 /// block_on(ex.run(async {
389 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8080))?;
390 /// loop {
391 /// let mut stream = listener.accept().await?;
392 /// let task = ex.spawn(async move {
393 /// // Process each connection concurrently
394 /// });
395 /// }
396 /// Ok(())
397 /// }))
398 /// # }
399 /// ```
400 pub fn spawn<T: 'a>(&self, fut: impl Future<Output = T> + 'a) -> TaskHandle<T> {
401 let ret = Rc::new(RetData {
402 value: Cell::new(None),
403 waker: Cell::new(None),
404 });
405 let ret_clone = ret.clone();
406 let handle_data = Rc::<HandleData>::default();
407
408 let mut spawned = self.spawned.borrow_mut();
409 spawned.push(SpawnedTask {
410 future: Box::pin(async move {
411 let retval = fut.await;
412 let ret = ret_clone;
413 ret.value.set(Some(retval));
414 if let Some(waker) = ret.waker.take() {
415 waker.wake();
416 }
417 }),
418 handle_data: handle_data.clone(),
419 });
420 TaskHandle { ret, handle_data }
421 }
422
423 /// Spawn a task using a [`Rc`] pointer to the executor, rather than a reference. This allows
424 /// for spawning more tasks inside spawned tasks.
425 ///
426 /// When attempting "recursive" task spawning using [`Executor::spawn`], you will encounter
427 /// borrow checker errors about the lifetime of the executor:
428 ///
429 /// ```compile_fail
430 /// use local_runtime::Executor;
431 ///
432 /// let ex = Executor::new();
433 /// // -- binding `ex` declared here
434 /// ex.block_on(async {
435 /// // ----- value captured here by coroutine
436 /// let outer_task = ex.spawn(async {
437 /// // ^^ borrowed value does not live long enough
438 /// let inner_task = ex.spawn(async { 10 });
439 /// inner_task.await;
440 /// });
441 /// });
442 /// // -
443 /// // |
444 /// // `ex` dropped here while still borrowed
445 /// // borrow might be used here, when `ex` is dropped and runs the destructor for type `Executor<'_>`
446 /// ```
447 ///
448 /// This happens because the future associated with the task is stored in the executor. So if
449 /// `outer_task` contains a reference to the executor, then the executor will be storing a
450 /// reference to itself, which is not allowed. To circumvent this issue, we need to put the
451 /// executor behind a [`Rc`] pointer and clone it into every task that we want to spawn more
452 /// tasks in. This is where [`Executor::spawn_rc`] comes in.
453 ///
454 /// Rather than taking a future, `spawn_rc` accepts a closure that takes a `Rc` to executor
455 /// and returns a future. This allows the future to capture the executor by value rather than
456 /// by reference, getting rid of the borrow error.
457 ///
458 /// # Example
459 ///
460 /// ```no_run
461 /// use std::rc::Rc;
462 /// use local_runtime::Executor;
463 ///
464 /// let ex = Rc::new(Executor::new());
465 /// ex.block_on(async {
466 /// let outer_task = ex.clone().spawn_rc(|ex| async move {
467 /// let inner_task = ex.spawn(async { 10 });
468 /// inner_task.await;
469 /// });
470 /// });
471 /// ```
472 pub fn spawn_rc<T: 'a, Fut: Future<Output = T> + 'a, F>(self: Rc<Self>, f: F) -> TaskHandle<T>
473 where
474 F: FnOnce(Rc<Self>) -> Fut + 'a,
475 {
476 let cl = self.clone();
477 self.spawn(f(cl))
478 }
479
480 fn register_base_waker(&self, base_waker: &Waker) {
481 // Acquire ordering
482 self.wake_queue.base_waker.register(base_waker);
483 }
484
485 // Poll tasks that have been awoken, returning whether the main future has been awoken
486 fn poll_tasks(&self) -> bool {
487 let mut main_task_awoken = false;
488 let mut tasks = self.tasks.borrow_mut();
489
490 self.wake_queue.drain_for_each(|task_id| {
491 if task_id == MAIN_TASK_ID {
492 main_task_awoken = true;
493 }
494 // For each awoken task, find it if it still exists
495 else if let Some(task) = tasks.get_mut(task_id) {
496 // If a task is cancelled, don't poll it, just remove it
497 if task.handle_data.cancelled.get() || task.poll().is_ready() {
498 tasks.remove(task_id);
499 }
500 }
501 });
502
503 main_task_awoken
504 }
505
506 // Poll newly spawned tasks and move them to the task list
507 fn poll_spawned(&self) {
508 let mut tasks = self.tasks.borrow_mut();
509 // Keep checking newly spawned tasks until there's no more left.
510 // Reborrow the spawned tasks on every iteration, because the tasks themselves also need to
511 // borrow the spawned tasks.
512 while let Some(spawned_task) = self.spawned.borrow_mut().pop() {
513 // Ignore cancelled tasks
514 if spawned_task.handle_data.cancelled.get() {
515 continue;
516 }
517
518 let next_vacancy = tasks.vacant_entry();
519 // Use the Slab index as the task ID.
520 // If the waker outlives the task or the task calls the waker even after returning
521 // Ready, then it's possible for the waker to wake up another task that's replaced the
522 // original task in the Slab. This should be rare, and at worse causes spurious wakeups.
523 let task_id = next_vacancy.key();
524 assert_ne!(task_id, MAIN_TASK_ID);
525 let waker_pair = TaskWaker::waker_pair(self.wake_queue.clone(), task_id);
526 let mut task = Task::from_spawned(spawned_task, waker_pair);
527 // Only insert the task if it returns pending
528 if task.poll().is_pending() {
529 next_vacancy.insert(task);
530 }
531 }
532 }
533
534 /// Blocking version of [`Executor::run`].
535 ///
536 /// This is just a shorthand for calling `block_on(ex.run(fut))`.
537 ///
538 /// # Panic
539 ///
540 /// Calling this function within a task spawned on the same executor will panic.
541 pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> T {
542 block_on(self.run(fut))
543 }
544
545 /// Drives the future to completion asynchronously while also driving all spawned tasks
546 ///
547 /// When this function completes, it will drop all unfinished tasks that were spawned on the
548 /// executor.
549 ///
550 /// # Panic
551 ///
552 /// Polling the future returned by this function within a task spawned on the same executor will
553 /// panic.
554 ///
555 /// # Example
556 ///
557 /// ```
558 /// use std::net::UdpSocket;
559 /// use std::io;
560 /// use local_runtime::{block_on, Executor, Async};
561 ///
562 /// // Run future on current thread
563 /// block_on(async {
564 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
565 /// let addr = socket.get_ref().local_addr()?;
566 /// socket.connect(addr)?;
567 ///
568 /// let ex = Executor::new();
569 /// ex.run(async {
570 /// let task = ex.spawn(async {
571 /// socket.send(b"hello").await?;
572 /// socket.send(b"hello").await?;
573 /// Ok::<_, io::Error>(())
574 /// });
575 ///
576 /// let mut data = [0u8; 5];
577 /// socket.recv(&mut data).await?;
578 /// socket.recv(&mut data).await?;
579 /// task.await
580 /// }).await
581 /// });
582 /// ```
583 pub async fn run<T>(&self, fut: impl Future<Output = T>) -> T {
584 let mut fut = pin!(fut);
585 // Create waker for main future
586 let (main_waker_data, main_waker) =
587 TaskWaker::waker_pair(self.wake_queue.clone(), MAIN_TASK_ID);
588 self.wake_queue.reset(MAIN_TASK_ID);
589
590 let out = poll_fn(move |cx| {
591 self.register_base_waker(cx.waker());
592 let main_task_awoken = self.poll_tasks();
593 if main_task_awoken {
594 main_waker_data.to_sleep();
595 if let Poll::Ready(out) = fut.as_mut().poll(&mut Context::from_waker(&main_waker)) {
596 return Poll::Ready(out);
597 }
598 }
599 self.poll_spawned();
600 Poll::Pending
601 })
602 .await;
603
604 // Drop all unfinished tasks so that any Rc<Executor> inside the tasks are dropped. This
605 // prevents Rc-cycles and guarantees that the executor will be dropped later
606 self.tasks.borrow_mut().clear();
607 self.spawned.borrow_mut().clear();
608 out
609 }
610}
611
612struct RetData<T> {
613 value: Cell<Option<T>>,
614 waker: Cell<Option<Waker>>,
615}
616
617#[derive(Default)]
618struct HandleData {
619 cancelled: Cell<bool>,
620 waker: Cell<Option<Waker>>,
621}
622
623/// A handle to a spawned task
624///
625/// A `TaskHandle` can be awaited to wait for the completion of its associated task and get its
626/// result.
627///
628/// A `TaskHandle` detaches its task when dropped. This means the it can no longer be awaited, but
629/// the executor will still poll its task.
630///
631/// This is created by [`Executor::spawn`] and [`Executor::spawn_rc`].
632pub struct TaskHandle<T> {
633 ret: Rc<RetData<T>>,
634 handle_data: Rc<HandleData>,
635}
636
637impl<T> TaskHandle<T> {
638 /// Cancel the task
639 ///
640 /// Deletes the task from the executor so that it won't be polled again.
641 ///
642 /// If the handle is awaited after cancellation, it might still complete if the task was
643 /// already finished before it was cancelled. However, the likelier outcomes is that it never
644 /// completes.
645 pub fn cancel(&self) {
646 self.handle_data.cancelled.set(true);
647 // If the task has a waker, then it has already been added to the task list, so it needs to
648 // be awoken in order for its cancellation status to be checked
649 if let Some(waker) = self.handle_data.waker.take() {
650 waker.wake();
651 }
652 }
653
654 /// Check if this task is finished
655 ///
656 /// If this returns `true`, the next `poll` call is guaranteed to return [`Poll::Ready`].
657 pub fn is_finished(&self) -> bool {
658 // SAFETY: We never get a long-lived reference to ret.value, so aliasing cannot occur
659 unsafe { (*self.ret.value.as_ptr()).is_some() }
660 }
661
662 /// Check if this task has been cancelled
663 pub fn is_cancelled(&self) -> bool {
664 self.handle_data.cancelled.get()
665 }
666}
667
668impl<T> Future for TaskHandle<T> {
669 type Output = T;
670
671 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
672 if let Some(val) = self.ret.value.take() {
673 return Poll::Ready(val);
674 }
675
676 let mut waker = self.ret.waker.take();
677 match &mut waker {
678 Some(waker) => waker.clone_from(cx.waker()),
679 None => waker = Some(cx.waker().clone()),
680 }
681 self.ret.waker.set(waker);
682 Poll::Pending
683 }
684}
685
686#[cfg(test)]
687mod tests {
688 use std::{future::pending, time::Duration};
689
690 use crate::{test::MockWaker, time::sleep};
691
692 use super::*;
693
694 #[test]
695 fn spawn_and_poll() {
696 let ex = Executor::new();
697 assert_eq!(ex.tasks.borrow().len(), 0);
698
699 ex.spawn(pending::<()>());
700 ex.spawn(pending::<()>());
701 ex.spawn(pending::<()>());
702 ex.poll_tasks();
703 ex.poll_spawned();
704 assert_eq!(ex.tasks.borrow().len(), 3);
705
706 ex.spawn(async {});
707 ex.spawn(async {});
708 ex.poll_tasks();
709 ex.poll_spawned();
710 assert_eq!(ex.tasks.borrow().len(), 3);
711 }
712
713 #[test]
714 fn task_waker() {
715 let base_waker = Arc::new(MockWaker::default());
716 let mut n = 0;
717 let ex = Executor::new();
718 ex.register_base_waker(&base_waker.clone().into());
719 ex.spawn(poll_fn(|cx| {
720 n += 1;
721 cx.waker().wake_by_ref();
722 Poll::<()>::Pending
723 }));
724
725 // Poll the spawned tasks, which should wake up right away
726 ex.poll_spawned();
727 assert_eq!(unsafe { (*ex.wake_queue.local.get()).len() }, 1);
728 assert!(base_waker.get());
729 // Poll the awoken task, which should wake up again
730 ex.poll_tasks();
731 assert_eq!(unsafe { (*ex.wake_queue.local.get()).len() }, 1);
732
733 drop(ex);
734 // Should have polled twice
735 assert_eq!(n, 2);
736 }
737
738 #[test]
739 fn cancel() {
740 let ex = Executor::new();
741 assert_eq!(ex.tasks.borrow().len(), 0);
742
743 let task = ex.spawn(pending::<()>());
744 // Cancel task while it's in the spawned list
745 task.cancel();
746 assert!(task.is_cancelled());
747 ex.poll_tasks();
748 ex.poll_spawned();
749 assert_eq!(ex.tasks.borrow().len(), 0);
750
751 let task = ex.spawn(pending::<()>());
752 assert!(!task.is_cancelled());
753 ex.poll_tasks();
754 ex.poll_spawned();
755 assert_eq!(ex.tasks.borrow().len(), 1);
756
757 // Cancel task while it's in the task list
758 task.cancel();
759 ex.poll_tasks();
760 ex.poll_spawned();
761 assert_eq!(ex.tasks.borrow().len(), 0);
762 }
763
764 #[test]
765 fn wake_queue() {
766 let queue = WakeQueue::with_capacity(4);
767 queue.push(12);
768 queue.push(13);
769
770 thread::scope(|s| {
771 let queue = &queue;
772 for i in 0..10 {
773 s.spawn(move || queue.push(i));
774 }
775 });
776
777 assert_eq!(queue.concurrent.len(), 10);
778 assert_eq!(unsafe { (*queue.local.get()).len() }, 2);
779
780 let mut elems = vec![];
781 queue.drain_for_each(|e| elems.push(e));
782 elems.sort_unstable();
783 assert_eq!(elems, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 12, 13]);
784
785 queue.push(12);
786 queue.push(13);
787 queue.reset(6);
788 assert_eq!(queue.concurrent.len(), 0);
789 assert_eq!(unsafe { (*queue.local.get()).len() }, 1);
790 queue.drain_for_each(|e| assert_eq!(e, 6));
791 }
792
793 #[test]
794 fn switch_waker() {
795 let ex = Executor::new();
796 let waker1 = Arc::new(MockWaker::default());
797 let waker2 = Arc::new(MockWaker::default());
798
799 let mut fut = pin!(ex.run(async {
800 let _bg = ex.spawn(sleep(Duration::from_millis(100)));
801 sleep(Duration::from_millis(50)).await;
802 // Have the future wait forever without polling the background task
803 pending::<()>().await;
804 }));
805
806 // Poll future with waker1
807 assert!(fut
808 .as_mut()
809 .poll(&mut Context::from_waker(&waker1.clone().into()))
810 .is_pending());
811 // Wait for the reactor, which should notify waker1
812 REACTOR.with(|r| r.wait()).unwrap();
813 assert!(waker1.get());
814
815 // Poll future with waker2
816 assert!(fut
817 .as_mut()
818 .poll(&mut Context::from_waker(&waker2.clone().into()))
819 .is_pending());
820 // Wait for the reactor, which should notify waker2
821 // even though the sleep task is never polled after switching to waker2
822 REACTOR.with(|r| r.wait()).unwrap();
823 assert!(waker2.get());
824 }
825
826 #[test]
827 fn switch_waker_join() {
828 let waker1 = Arc::new(MockWaker::default());
829 let waker2 = Arc::new(MockWaker::default());
830
831 let mut fut = pin!(join!(
832 sleep(Duration::from_millis(50)),
833 sleep(Duration::from_millis(100))
834 ));
835
836 // Poll future with waker1
837 assert!(fut
838 .as_mut()
839 .poll(&mut Context::from_waker(&waker1.clone().into()))
840 .is_pending());
841 // Wait for the reactor, which should notify waker1
842 REACTOR.with(|r| r.wait()).unwrap();
843 assert!(waker1.get());
844
845 // Poll future with waker2
846 assert!(fut
847 .as_mut()
848 .poll(&mut Context::from_waker(&waker2.clone().into()))
849 .is_pending());
850 // Wait for the reactor, which should notify waker2
851 // even though the sleep task is never polled after switching to waker2
852 REACTOR.with(|r| r.wait()).unwrap();
853 assert!(waker2.get());
854 }
855}