yash_executor/executor.rs
1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2024 WATANABE Yuki
3
4//! Implementation of `Executor`
5
6use crate::forwarder::{forwarder, Receiver};
7use crate::{Executor, ExecutorState, Spawner, Task};
8use alloc::boxed::Box;
9use alloc::rc::Rc;
10use core::cell::RefCell;
11use core::future::{Future, IntoFuture};
12use core::pin::Pin;
13
14impl<'a> Executor<'a> {
15 /// Creates a new `Executor` with an empty task queue.
16 #[must_use]
17 pub fn new() -> Self {
18 Self::default()
19 }
20
21 /// Returns the number of tasks that have been woken up but not yet polled.
22 #[must_use]
23 pub fn wake_count(&self) -> usize {
24 self.state.borrow().wake_queue.len()
25 }
26
27 /// Adds a task to the task queue.
28 ///
29 /// The added task is not polled immediately. It will be polled when the
30 /// executor runs tasks.
31 ///
32 /// # Safety
33 ///
34 /// It may be surprising that this method is unsafe. The reason is that the
35 /// `Waker` available in the `Context` passed to the future's `poll` method
36 /// is thread-unsafe despite `Waker` being `Send` and `Sync`. The `Waker` is
37 /// not protected by a lock or atomic operation, and it is your sole
38 /// responsibility to ensure that the `Waker` is not passed to or accessed
39 /// from other threads.
40 pub unsafe fn spawn_pinned(&self, future: Pin<Box<dyn Future<Output = ()> + 'a>>) {
41 ExecutorState::enqueue(&self.state, future);
42 }
43
44 /// Adds a task to the task queue.
45 ///
46 /// This method is an extended version of [`spawn_pinned`] that can take a
47 /// non-pinned future and may return a non-unit output. The result of the
48 /// future will be sent to the returned receiver.
49 ///
50 /// The added task is not polled immediately. It will be polled when the
51 /// executor runs tasks.
52 ///
53 /// # Safety
54 ///
55 /// See [`spawn_pinned`] for safety considerations.
56 ///
57 /// [`spawn_pinned`]: Self::spawn_pinned
58 pub unsafe fn spawn<F, T>(&self, future: F) -> Receiver<T>
59 where
60 F: IntoFuture<Output = T> + 'a,
61 T: 'a,
62 {
63 ExecutorState::enqueue_forwarding(&self.state, future)
64 }
65
66 /// Returns a `Spawner` that can spawn tasks.
67 #[must_use]
68 pub fn spawner(&self) -> Spawner<'a> {
69 let state = Rc::downgrade(&self.state);
70 Spawner { state }
71 }
72
73 /// Runs a task that has been woken up.
74 ///
75 /// This method removes a single task from the task queue and polls it.
76 /// Returns:
77 /// - `Some(true)` if the task is complete
78 /// - `Some(false)` if the task is not complete
79 /// - `None` if there are no tasks to run
80 ///
81 /// This method panics if the task is polled recursively.
82 pub fn step(&self) -> Option<bool> {
83 let task = self.state.borrow_mut().wake_queue.pop_front()?;
84 Some(task.poll())
85 }
86
87 /// Runs tasks until there are no more tasks to run.
88 ///
89 /// This method repeatedly calls `step` until it returns `None`, that is,
90 /// there are no more tasks that have been woken up. Returns the number of
91 /// completed tasks.
92 ///
93 /// This method panics if a task is polled recursively.
94 pub fn run_until_stalled(&self) -> usize {
95 let mut completed = 0;
96 while let Some(is_complete) = self.step() {
97 if is_complete {
98 completed += 1;
99 }
100 }
101 completed
102 }
103}
104
105impl<'a> ExecutorState<'a> {
106 pub(crate) fn enqueue(
107 this: &Rc<RefCell<Self>>,
108 future: Pin<Box<dyn Future<Output = ()> + 'a>>,
109 ) {
110 let task = Task {
111 executor: Rc::downgrade(this),
112 future: RefCell::new(Some(future)),
113 };
114 this.borrow_mut().wake_queue.push_back(Rc::new(task));
115 }
116
117 pub(crate) fn enqueue_forwarding<F, T>(this: &Rc<RefCell<Self>>, future: F) -> Receiver<T>
118 where
119 F: IntoFuture<Output = T> + 'a,
120 T: 'a,
121 {
122 let (sender, receiver) = forwarder();
123 let task = Task {
124 executor: Rc::downgrade(this),
125 future: RefCell::new(Some(Box::pin(async move {
126 sender.send(future.await).unwrap_or_default()
127 }))),
128 };
129 this.borrow_mut().wake_queue.push_back(Rc::new(task));
130 receiver
131 }
132}