Skip to main content

yash_env/system/concurrency/
run_virtual.rs

1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2026 WATANABE Yuki
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17//! Methods for running tasks with concurrency (`VirtualSystem`-specific)
18
19use super::super::r#virtual::VirtualSystem;
20use super::Concurrent;
21use super::RunLoop;
22use super::Select as _;
23use crate::job::ProcessState;
24use futures_util::{pending, poll};
25use std::pin::pin;
26
27impl Concurrent<VirtualSystem> {
28    /// Runs the given task with concurrency support.
29    ///
30    /// This function implements the main loop of the shell process. It runs the
31    /// given task while also calling [`select`](super::Select::select) to handle signals
32    /// and other events. The task is expected to perform I/O operations using
33    /// the methods of this `Concurrent` instance, so that it can yield when the
34    /// operations would block. The function returns when the task completes or
35    /// the process is terminated.
36    ///
37    /// This is the `VirtualSystem` counterpart for the
38    /// [`run_real`](Self::run_real) method. To allow `VirtualSystem` to run
39    /// multiple tasks concurrently, this method is asynchronous and returns a
40    /// future that completes when the task finishes or the process is
41    /// terminated.
42    pub async fn run_virtual<F>(&self, task: F)
43    where
44        F: Future<Output = ()>,
45    {
46        let mut task = pin!(task);
47        while poll!(&mut task).is_pending() {
48            let state = self.inner.current_process().state();
49            match state {
50                ProcessState::Running => {
51                    // The process is running, but the task is not ready yet, so we need to wait
52                    // for it to become ready. Proceed to the `select` call below.
53                }
54                ProcessState::Halted(result) => {
55                    if result.is_stopped() {
56                        // The process is stopped while the task is still working.
57                        let terminated = self.inner.block_while_stopped().await;
58                        if !terminated {
59                            // The process has been resumed, so we can continue running the task.
60                            continue;
61                        }
62                    }
63                    // The process has been terminated, so we simply abort the task.
64                    return;
65                }
66            }
67
68            let mut select = pin!(self.select());
69            while poll!(&mut select).is_pending() {
70                let state = self.inner.current_process().state();
71                match state {
72                    ProcessState::Running => {
73                        // The process is running, but the select call is not ready yet, so we need
74                        // to wait for it to become ready. Here we propagate the pending state to
75                        // the caller to yield to other processes.
76                        pending!()
77                    }
78                    ProcessState::Halted(result) => {
79                        if result.is_stopped() {
80                            // The process is stopped while we are waiting for the select call.
81                            let terminated = self.inner.block_while_stopped().await;
82                            if !terminated {
83                                // The process has been resumed, so we can continue waiting
84                                // for the select call.
85                                continue;
86                            }
87                        }
88                        // The process has been terminated, so we simply abort the task.
89                        return;
90                    }
91                }
92            }
93        }
94    }
95}
96
97/// Allows the [`Concurrent::run_virtual`] method to be used via the [`RunLoop`] trait.
98impl RunLoop for VirtualSystem {
99    /// Calls [`Concurrent::run_virtual`].
100    #[inline(always)]
101    fn run_loop<'c, F>(
102        concurrent: &'c Concurrent<Self>,
103        task: F,
104    ) -> impl Future<Output = ()> + use<'c, F>
105    where
106        F: Future<Output = ()>,
107    {
108        concurrent.run_virtual(task)
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::super::Sleep as _;
115    use super::*;
116    use crate::semantics::ExitStatus;
117    use crate::system::r#virtual::{SIGCONT, SIGKILL, SIGSTOP};
118    use crate::system::{Exit as _, SendSignal as _};
119    use crate::test_helper::WakeFlag;
120    use futures_util::FutureExt as _;
121    use std::cell::Cell;
122    use std::rc::Rc;
123    use std::sync::Arc;
124    use std::task::Poll::{Pending, Ready};
125    use std::task::{Context, Waker};
126    use std::time::{Duration, Instant};
127
128    struct DropFlag(Rc<Cell<bool>>);
129
130    impl Drop for DropFlag {
131        fn drop(&mut self) {
132            self.0.set(true);
133        }
134    }
135
136    fn virtual_system_with_current_time() -> (Concurrent<VirtualSystem>, Instant) {
137        let inner = VirtualSystem::new();
138        let now = Instant::now();
139        inner.state.borrow_mut().now = Some(now);
140        (Concurrent::new(inner), now)
141    }
142
143    #[test]
144    fn run_virtual_returns_immediately_when_task_is_ready_on_first_poll() {
145        let system = Concurrent::new(VirtualSystem::new());
146        let completed = Cell::new(false);
147
148        let result = system
149            .run_virtual(async { completed.set(true) })
150            .now_or_never();
151
152        assert_eq!(result, Some(()));
153        assert!(completed.get());
154    }
155
156    #[test]
157    fn run_virtual_completes_normally_when_task_alternates_between_pending_and_ready() {
158        let (system, now) = virtual_system_with_current_time();
159        let progress = Rc::new(Cell::new(0));
160        let progress_2 = Rc::clone(&progress);
161        let mut future = pin!(system.run_virtual(async {
162            progress_2.set(1);
163            system.sleep(Duration::from_secs(1)).await;
164            progress_2.set(2);
165            system.sleep(Duration::from_secs(1)).await;
166            progress_2.set(3);
167        }));
168
169        let mut context = Context::from_waker(Waker::noop());
170        assert_eq!(future.as_mut().poll(&mut context), Pending);
171        assert_eq!(progress.get(), 1);
172
173        system
174            .inner
175            .state
176            .borrow_mut()
177            .advance_time(now + Duration::from_secs(1));
178        assert_eq!(future.as_mut().poll(&mut context), Pending);
179        assert_eq!(progress.get(), 2);
180
181        system
182            .inner
183            .state
184            .borrow_mut()
185            .advance_time(now + Duration::from_secs(2));
186        assert_eq!(future.as_mut().poll(&mut context), Ready(()));
187        assert_eq!(progress.get(), 3);
188    }
189
190    #[test]
191    fn run_virtual_waits_on_select_while_process_is_running_and_task_is_pending() {
192        let (system, now) = virtual_system_with_current_time();
193        let completed = Rc::new(Cell::new(false));
194        let completed_2 = Rc::clone(&completed);
195        let mut future = pin!(system.run_virtual(async {
196            system.sleep(Duration::from_secs(1)).await;
197            completed_2.set(true);
198        }));
199
200        let wake_flag = Arc::new(WakeFlag::new());
201        let waker = Waker::from(Arc::clone(&wake_flag));
202        let mut context = Context::from_waker(&waker);
203        assert_eq!(future.as_mut().poll(&mut context), Pending);
204        assert!(!completed.get());
205        assert!(!wake_flag.is_woken());
206
207        system
208            .inner
209            .state
210            .borrow_mut()
211            .advance_time(now + Duration::from_secs(1));
212        assert!(wake_flag.is_woken());
213
214        let wake_flag = Arc::new(WakeFlag::new());
215        let waker = Waker::from(Arc::clone(&wake_flag));
216        let mut context = Context::from_waker(&waker);
217        assert_eq!(future.as_mut().poll(&mut context), Ready(()));
218        assert!(completed.get());
219        assert!(!wake_flag.is_woken());
220    }
221
222    #[test]
223    fn run_virtual_yields_pending_to_caller_while_waiting_on_pending_select_in_running_state() {
224        let (system, _now) = virtual_system_with_current_time();
225        let completed = Rc::new(Cell::new(false));
226        let completed_2 = Rc::clone(&completed);
227        let mut future = pin!(system.run_virtual(async {
228            system.sleep(Duration::from_secs(1)).await;
229            completed_2.set(true);
230        }));
231
232        let mut context = Context::from_waker(Waker::noop());
233        assert_eq!(future.as_mut().poll(&mut context), Pending);
234        assert_eq!(future.as_mut().poll(&mut context), Pending);
235        assert!(!completed.get());
236    }
237
238    #[test]
239    fn run_virtual_aborts_task_when_process_is_already_terminated_before_entering_select() {
240        let system = Concurrent::new(VirtualSystem::new());
241        let dropped = Rc::new(Cell::new(false));
242        let dropped_2 = Rc::clone(&dropped);
243        let mut future = pin!(system.run_virtual(async {
244            let _drop_flag = DropFlag(dropped_2);
245            system.exit(ExitStatus(42)).await;
246        }));
247
248        let mut context = Context::from_waker(Waker::noop());
249        assert_eq!(future.as_mut().poll(&mut context), Ready(()));
250        assert!(dropped.get());
251    }
252
253    #[test]
254    fn run_virtual_blocks_while_stopped_before_select_and_resumes_task_when_process_is_continued() {
255        let system = Concurrent::new(VirtualSystem::new());
256        let completed = Rc::new(Cell::new(false));
257        let mut future = pin!(system.run_virtual(async {
258            system.raise(SIGSTOP).await.unwrap();
259            completed.set(true);
260        }));
261
262        let mut context = Context::from_waker(Waker::noop());
263        assert_eq!(future.as_mut().poll(&mut context), Pending);
264        assert_eq!(
265            system.inner.current_process().state(),
266            ProcessState::stopped(SIGSTOP),
267        );
268        assert!(!completed.get());
269
270        _ = system.inner.current_process_mut().raise_signal(SIGCONT);
271        assert_eq!(future.as_mut().poll(&mut context), Ready(()));
272        assert!(completed.get());
273    }
274
275    #[test]
276    fn run_virtual_blocks_while_stopped_before_select_and_aborts_when_process_terminates() {
277        let system = Concurrent::new(VirtualSystem::new());
278        let dropped = Rc::new(Cell::new(false));
279        let dropped_2 = Rc::clone(&dropped);
280        let mut future = pin!(system.run_virtual(async {
281            let _drop_flag = DropFlag(dropped_2);
282            system.raise(SIGSTOP).await.unwrap();
283            unreachable!("task should be aborted while stopped");
284        }));
285
286        let mut context = Context::from_waker(Waker::noop());
287        assert_eq!(future.as_mut().poll(&mut context), Pending);
288        assert!(!dropped.get());
289
290        _ = system.inner.current_process_mut().raise_signal(SIGKILL);
291        assert_eq!(future.as_mut().poll(&mut context), Ready(()));
292        assert!(dropped.get());
293    }
294
295    #[test]
296    fn run_virtual_blocks_while_stopped_during_pending_select_and_continues_waiting_after_resume() {
297        let (system, now) = virtual_system_with_current_time();
298        let completed = Rc::new(Cell::new(false));
299        let completed_2 = Rc::clone(&completed);
300        let mut future = pin!(system.run_virtual(async {
301            system.sleep(Duration::from_secs(1)).await;
302            completed_2.set(true);
303        }));
304
305        let mut context = Context::from_waker(Waker::noop());
306        assert_eq!(future.as_mut().poll(&mut context), Pending);
307
308        _ = system
309            .inner
310            .current_process_mut()
311            .set_state(ProcessState::stopped(SIGSTOP));
312        assert_eq!(future.as_mut().poll(&mut context), Pending);
313        assert!(!completed.get());
314
315        system
316            .inner
317            .state
318            .borrow_mut()
319            .advance_time(now + Duration::from_secs(1));
320        assert_eq!(future.as_mut().poll(&mut context), Pending);
321        assert!(!completed.get());
322
323        _ = system
324            .inner
325            .current_process_mut()
326            .set_state(ProcessState::Running);
327        assert_eq!(future.as_mut().poll(&mut context), Ready(()));
328        assert!(completed.get());
329    }
330
331    #[test]
332    fn run_virtual_blocks_while_stopped_during_pending_select_and_aborts_when_terminated() {
333        let (system, _now) = virtual_system_with_current_time();
334        let dropped = Rc::new(Cell::new(false));
335        let mut future = pin!(system.run_virtual(async {
336            let _drop_flag = DropFlag(Rc::clone(&dropped));
337            system.sleep(Duration::from_secs(1)).await;
338            unreachable!("task should be aborted while sleeping");
339        }));
340
341        let mut context = Context::from_waker(Waker::noop());
342        assert_eq!(future.as_mut().poll(&mut context), Pending);
343
344        _ = system
345            .inner
346            .current_process_mut()
347            .set_state(ProcessState::stopped(SIGSTOP));
348        assert_eq!(future.as_mut().poll(&mut context), Pending);
349        assert!(!dropped.get());
350
351        _ = system.inner.current_process_mut().raise_signal(SIGKILL);
352        assert_eq!(future.as_mut().poll(&mut context), Ready(()));
353        assert!(dropped.get());
354    }
355
356    #[test]
357    fn run_virtual_aborts_immediately_when_process_becomes_terminated_while_waiting_on_pending_select()
358     {
359        let (system, _now) = virtual_system_with_current_time();
360        let dropped = Rc::new(Cell::new(false));
361        let mut future = pin!(system.run_virtual(async {
362            let _drop_flag = DropFlag(Rc::clone(&dropped));
363            system.sleep(Duration::from_secs(1)).await;
364            unreachable!("task should be aborted while sleeping");
365        }));
366
367        let mut context = Context::from_waker(Waker::noop());
368        assert_eq!(future.as_mut().poll(&mut context), Pending);
369        assert!(!dropped.get());
370
371        _ = system.inner.current_process_mut().raise_signal(SIGKILL);
372        assert_eq!(future.as_mut().poll(&mut context), Ready(()));
373        assert!(dropped.get());
374    }
375}