yash_executor/lib.rs
1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2024 WATANABE Yuki
3
4//! `yash-executor` is a library for running concurrent tasks in a
5//! single-threaded context. This crate supports `no_std` configurations but
6//! requires the `alloc` crate.
7//!
8//! The [`Executor`] provided by this crate can be instantiated more than once
9//! to run multiple sets of tasks concurrently. Each executor maintains its
10//! own set of tasks and does not share tasks with other executors. This is
11//! different from other executor implementations that use a global or
12//! thread-local executor.
13//!
14//! This crate is free of locks and atomic operations at the cost of
15//! [unsafe spawning](Executor::spawn_pinned). Wakers used in this crate are
16//! thread-unsafe and not guarded by locks or atomics, so you must ensure that
17//! wakers are not shared between threads.
18//!
19//! ```
20//! # use yash_executor::Executor;
21//! # use yash_executor::forwarder::TryReceiveError;
22//! let executor = Executor::new();
23//!
24//! // Spawn a task that returns 42
25//! let receiver = unsafe { executor.spawn(async { 42 }) };
26//!
27//! // The task is not yet complete
28//! assert_eq!(receiver.try_receive(), Err(TryReceiveError::NotSent));
29//!
30//! // Run the executor until the task is complete
31//! executor.run_until_stalled();
32//!
33//! // Now we have the result
34//! assert_eq!(receiver.try_receive(), Ok(42));
35//! ```
36//!
37//! [`Spawner`]s provide a subset of the functionality of [`Executor`] to allow
38//! spawning tasks without access to the full executor. It is useful for adding
39//! tasks from within another task without creating cyclic dependencies, which
40//! can cause memory leaks.
41//!
42//! The [`forwarder`] module provides utilities for forwarding the result of a
43//! future to another future. The [`forwarder`](forwarder::forwarder) function
44//! creates a pair of [`Sender`] and [`Receiver`] that share an internal state
45//! to communicate the result of a future. A `Receiver` is also returned from
46//! the [`Executor::spawn`] method to receive the result of a future.
47//!
48//! [`Sender`]: forwarder::Sender
49//! [`Receiver`]: forwarder::Receiver
50
51#![no_std]
52extern crate alloc;
53
54use alloc::boxed::Box;
55use alloc::collections::VecDeque;
56use alloc::rc::{Rc, Weak};
57use core::cell::RefCell;
58use core::fmt::Debug;
59use core::future::Future;
60use core::pin::Pin;
61
62/// Interface for running concurrent tasks
63///
64/// You call the [`spawn_pinned`](Self::spawn_pinned) or [`spawn`](Self::spawn)
65/// method to add a task to the executor. Just adding a task to the executor
66/// does not run it. You need to call the [`step`](Self::step) or
67/// [`run_until_stalled`](Self::run_until_stalled) method to run the tasks.
68///
69/// `Executor` implements `Clone` but all clones share the same set of tasks.
70/// Separately created `Executor` instances do not share tasks.
71#[derive(Clone, Debug, Default)]
72pub struct Executor<'a> {
73 state: Rc<RefCell<ExecutorState<'a>>>,
74}
75
76/// Interface for spawning tasks
77///
78/// `Spawner` provides a subset of the functionality of `Executor` to allow
79/// spawning tasks without access to the full executor.
80///
81/// `Spawner` instances can be cloned and share the same executor state.
82/// `Spawner`s maintain a weak reference to the executor state, so they do not
83/// prevent the executor from being dropped. If the executor is dropped, the
84/// `Spawner` will not be able to spawn any more tasks.
85///
86/// To obtain a `Spawner` from an `Executor`, use the [`Executor::spawner`]
87/// method. The [`dead`](Self::dead) and `default` functions return a `Spawner`
88/// that can never spawn tasks.
89///
90/// ```
91/// # use yash_executor::Executor;
92/// let executor = Executor::new();
93/// let spawner = executor.spawner();
94/// let final_receiver = unsafe {
95/// executor.spawn(async move {
96/// let receiver_1 = spawner.spawn(async { 1 }).unwrap();
97/// let receiver_2 = spawner.spawn(async { 2 }).unwrap();
98/// receiver_2.await + receiver_1.await
99/// })
100/// };
101/// executor.run_until_stalled();
102/// assert_eq!(final_receiver.try_receive(), Ok(3));
103/// ```
104#[derive(Clone, Debug, Default)]
105pub struct Spawner<'a> {
106 state: Weak<RefCell<ExecutorState<'a>>>,
107}
108
109/// Internal state of the executor
110#[derive(Default)]
111struct ExecutorState<'a> {
112 /// Queue of woken tasks to be executed
113 ///
114 /// Tasks are added to the queue when they are woken up by another task or
115 /// when they are spawned. The executor removes a task from the queue and
116 /// polls it once. If the poll method returns `Poll::Pending`, the task
117 /// needs to be added back to the queue by some waker when it is ready to
118 /// be polled again.
119 wake_queue: VecDeque<Rc<Task<'a>>>,
120 // We don't need to store tasks that are waiting to be woken up because they
121 // are retained by wakers. This also prevents leaking tasks that are never
122 // woken up.
123}
124
125impl Debug for ExecutorState<'_> {
126 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
127 f.debug_struct("ExecutorState")
128 .field(
129 "wake_queue",
130 &format_args!("(len = {})", self.wake_queue.len()),
131 )
132 .finish()
133 }
134}
135
136/// State of a task to be executed
137struct Task<'a> {
138 /// Shared state of the executor for running this task
139 executor: Weak<RefCell<ExecutorState<'a>>>,
140
141 /// The task to be executed
142 ///
143 /// This value becomes `None` when the task is completed to prevent polling
144 /// it again.
145 future: RefCell<Option<Pin<Box<dyn Future<Output = ()> + 'a>>>>,
146}
147
148pub mod forwarder;
149
150mod executor;
151mod spawner;
152mod task;
153mod waker;
154
155pub use spawner::SpawnError;