yash_executor/
executor.rs

1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2024 WATANABE Yuki
3
4//! Implementation of `Executor`
5
6use crate::forwarder::{forwarder, Receiver};
7use crate::{Executor, ExecutorState, Spawner, Task};
8use alloc::boxed::Box;
9use alloc::rc::Rc;
10use core::cell::RefCell;
11use core::future::{Future, IntoFuture};
12use core::pin::Pin;
13
14impl<'a> Executor<'a> {
15    /// Creates a new `Executor` with an empty task queue.
16    #[must_use]
17    pub fn new() -> Self {
18        Self::default()
19    }
20
21    /// Returns the number of tasks that have been woken up but not yet polled.
22    #[must_use]
23    pub fn wake_count(&self) -> usize {
24        self.state.borrow().wake_queue.len()
25    }
26
27    /// Adds a task to the task queue.
28    ///
29    /// The added task is not polled immediately. It will be polled when the
30    /// executor runs tasks.
31    ///
32    /// # Safety
33    ///
34    /// It may be surprising that this method is unsafe. The reason is that the
35    /// `Waker` available in the `Context` passed to the future's `poll` method
36    /// is thread-unsafe despite `Waker` being `Send` and `Sync`. The `Waker` is
37    /// not protected by a lock or atomic operation, and it is your sole
38    /// responsibility to ensure that the `Waker` is not passed to or accessed
39    /// from other threads.
40    pub unsafe fn spawn_pinned(&self, future: Pin<Box<dyn Future<Output = ()> + 'a>>) {
41        ExecutorState::enqueue(&self.state, future);
42    }
43
44    /// Adds a task to the task queue.
45    ///
46    /// This method is an extended version of [`spawn_pinned`] that can take a
47    /// non-pinned future and may return a non-unit output. The result of the
48    /// future will be sent to the returned receiver.
49    ///
50    /// The added task is not polled immediately. It will be polled when the
51    /// executor runs tasks.
52    ///
53    /// # Safety
54    ///
55    /// See [`spawn_pinned`] for safety considerations.
56    ///
57    /// [`spawn_pinned`]: Self::spawn_pinned
58    pub unsafe fn spawn<F, T>(&self, future: F) -> Receiver<T>
59    where
60        F: IntoFuture<Output = T> + 'a,
61        T: 'a,
62    {
63        ExecutorState::enqueue_forwarding(&self.state, future)
64    }
65
66    /// Returns a `Spawner` that can spawn tasks.
67    #[must_use]
68    pub fn spawner(&self) -> Spawner<'a> {
69        let state = Rc::downgrade(&self.state);
70        Spawner { state }
71    }
72
73    /// Runs a task that has been woken up.
74    ///
75    /// This method removes a single task from the task queue and polls it.
76    /// Returns:
77    /// - `Some(true)` if the task is complete
78    /// - `Some(false)` if the task is not complete
79    /// - `None` if there are no tasks to run
80    ///
81    /// This method panics if the task is polled recursively.
82    pub fn step(&self) -> Option<bool> {
83        let task = self.state.borrow_mut().wake_queue.pop_front()?;
84        Some(task.poll())
85    }
86
87    /// Runs tasks until there are no more tasks to run.
88    ///
89    /// This method repeatedly calls `step` until it returns `None`, that is,
90    /// there are no more tasks that have been woken up. Returns the number of
91    /// completed tasks.
92    ///
93    /// This method panics if a task is polled recursively.
94    pub fn run_until_stalled(&self) -> usize {
95        let mut completed = 0;
96        while let Some(is_complete) = self.step() {
97            if is_complete {
98                completed += 1;
99            }
100        }
101        completed
102    }
103}
104
105impl<'a> ExecutorState<'a> {
106    pub(crate) fn enqueue(
107        this: &Rc<RefCell<Self>>,
108        future: Pin<Box<dyn Future<Output = ()> + 'a>>,
109    ) {
110        let task = Task {
111            executor: Rc::downgrade(this),
112            future: RefCell::new(Some(future)),
113        };
114        this.borrow_mut().wake_queue.push_back(Rc::new(task));
115    }
116
117    pub(crate) fn enqueue_forwarding<F, T>(this: &Rc<RefCell<Self>>, future: F) -> Receiver<T>
118    where
119        F: IntoFuture<Output = T> + 'a,
120        T: 'a,
121    {
122        let (sender, receiver) = forwarder();
123        let task = Task {
124            executor: Rc::downgrade(this),
125            future: RefCell::new(Some(Box::pin(async move {
126                sender.send(future.await).unwrap_or_default()
127            }))),
128        };
129        this.borrow_mut().wake_queue.push_back(Rc::new(task));
130        receiver
131    }
132}