Skip to main content

yash_env/system/concurrency/
run.rs

1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2026 WATANABE Yuki
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17//! Methods for running tasks with concurrency
18
19#[cfg(unix)]
20use super::super::real::RealSystem;
21use super::super::r#virtual::VirtualSystem;
22use super::Concurrent;
23use crate::job::ProcessState;
24use futures_util::{pending, poll};
25use std::pin::pin;
26
27#[cfg(unix)]
28impl Concurrent<RealSystem> {
29    /// Runs the given task with concurrency support.
30    ///
31    /// This function implements the main loop of the shell process. It runs the
32    /// given task while also calling [`select`](Self::select) to handle signals
33    /// and other events. The task is expected to perform I/O operations using
34    /// the methods of this `Concurrent` instance, so that it can yield when the
35    /// operations would block. The function returns the output of the task when
36    /// it completes.
37    ///
38    /// This method supports concurrency only inside the task. Other tasks
39    /// created outside the task will not be run concurrently.
40    /// This method blocks the current thread until the task completes, so it
41    /// should only be called in the main function of the shell process.
42    /// See the [`run_virtual`](Self::run_virtual) method for the
43    /// `VirtualSystem` counterpart.
44    pub fn run_real<F, T>(&self, task: F) -> T
45    where
46        F: Future<Output = T>,
47    {
48        use std::task::Poll::{Pending, Ready};
49        use std::task::{Context, Waker};
50
51        let runner = pin!(async move {
52            let mut task = pin!(task);
53            loop {
54                if let Ready(result) = poll!(&mut task) {
55                    return result;
56                }
57                self.select().await;
58            }
59        });
60        match runner.poll(&mut Context::from_waker(Waker::noop())) {
61            Ready(result) => result,
62            Pending => unreachable!("`RealSystem::select` should never return `Pending`"),
63        }
64    }
65}
66
67impl Concurrent<VirtualSystem> {
68    /// Runs the given task with concurrency support.
69    ///
70    /// This function implements the main loop of the shell process. It runs the
71    /// given task while also calling [`select`](Self::select) to handle signals
72    /// and other events. The task is expected to perform I/O operations using
73    /// the methods of this `Concurrent` instance, so that it can yield when the
74    /// operations would block. The function returns the output of the task when
75    /// it completes.
76    ///
77    /// This is the `VirtualSystem` counterpart for the
78    /// [`run_real`](Self::run_real) method. To allow `VirtualSystem` to run
79    /// multiple tasks concurrently, this method is asynchronous and returns a
80    /// future that completes when the task finishes or the process is
81    /// terminated.
82    pub async fn run_virtual<F>(&self, task: F)
83    where
84        F: Future<Output = ()>,
85    {
86        let mut task = pin!(task);
87        while poll!(&mut task).is_pending() {
88            let state = self.inner.current_process().state();
89            match state {
90                ProcessState::Running => {
91                    // The process is running, but the task is not ready yet, so we need to wait
92                    // for it to become ready. Proceed to the `select` call below.
93                }
94                ProcessState::Halted(result) => {
95                    if result.is_stopped() {
96                        // The process is stopped while the task is still working.
97                        let terminated = self.inner.block_while_stopped().await;
98                        if !terminated {
99                            // The process has been resumed, so we can continue running the task.
100                            continue;
101                        }
102                    }
103                    // The process has been terminated, so we simply abort the task.
104                    return;
105                }
106            }
107
108            let mut select = pin!(self.select());
109            while poll!(&mut select).is_pending() {
110                let state = self.inner.current_process().state();
111                match state {
112                    ProcessState::Running => {
113                        // The process is running, but the select call is not ready yet, so we need
114                        // to wait for it to become ready. Here we propagate the pending state to
115                        // the caller to yield to other processes.
116                        pending!()
117                    }
118                    ProcessState::Halted(result) => {
119                        if result.is_stopped() {
120                            // The process is stopped while we are waiting for the select call.
121                            let terminated = self.inner.block_while_stopped().await;
122                            if !terminated {
123                                // The process has been resumed, so we can continue waiting
124                                // for the select call.
125                                continue;
126                            }
127                        }
128                        // The process has been terminated, so we simply abort the task.
129                        return;
130                    }
131                }
132            }
133        }
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[cfg(unix)]
142    mod real_system {
143        use super::*;
144        use std::cell::Cell;
145        use std::time::Duration;
146
147        #[test]
148        fn run_real_returns_task_output_immediately_if_ready_on_first_poll() {
149            let system = Concurrent::new(unsafe { RealSystem::new() });
150            let result = system.run_real(async { 42 });
151            assert_eq!(result, 42);
152        }
153
154        #[test]
155        fn run_real_keeps_polling_task_until_completion_when_task_yields_multiple_times() {
156            let system = Concurrent::new(unsafe { RealSystem::new() });
157            let progress = Cell::new(0);
158
159            let result = system.run_real(async {
160                progress.set(1);
161                system.sleep(Duration::from_millis(1)).await;
162                progress.set(2);
163                system.sleep(Duration::from_millis(1)).await;
164                progress.set(3);
165                42
166            });
167
168            assert_eq!(result, 42);
169            assert_eq!(progress.get(), 3);
170        }
171
172        #[test]
173        fn run_real_calls_select_between_task_polls_while_task_is_pending() {
174            let system = Concurrent::new(unsafe { RealSystem::new() });
175            let progress = Cell::new(0);
176
177            let result = system.run_real(async {
178                progress.set(1);
179                system.sleep(Duration::from_millis(1)).await;
180                progress.set(2);
181                7
182            });
183
184            assert_eq!(result, 7);
185            assert_eq!(progress.get(), 2);
186        }
187
188        #[test]
189        #[should_panic = "boom"]
190        fn run_real_propagates_task_panic_to_caller() {
191            let system = Concurrent::new(unsafe { RealSystem::new() });
192            system.run_real(async { panic!("boom") })
193        }
194    }
195
196    mod virtual_system {
197        use super::*;
198        use crate::semantics::ExitStatus;
199        use crate::system::r#virtual::{SIGCONT, SIGKILL, SIGSTOP};
200        use crate::system::{Exit as _, SendSignal as _};
201        use crate::test_helper::WakeFlag;
202        use futures_util::FutureExt as _;
203        use std::cell::Cell;
204        use std::rc::Rc;
205        use std::sync::Arc;
206        use std::task::Poll::{Pending, Ready};
207        use std::task::{Context, Waker};
208        use std::time::{Duration, Instant};
209
210        struct DropFlag(Rc<Cell<bool>>);
211
212        impl Drop for DropFlag {
213            fn drop(&mut self) {
214                self.0.set(true);
215            }
216        }
217
218        fn virtual_system_with_current_time() -> (Concurrent<VirtualSystem>, Instant) {
219            let inner = VirtualSystem::new();
220            let now = Instant::now();
221            inner.state.borrow_mut().now = Some(now);
222            (Concurrent::new(inner), now)
223        }
224
225        #[test]
226        fn run_virtual_returns_immediately_when_task_is_ready_on_first_poll() {
227            let system = Concurrent::new(VirtualSystem::new());
228            let completed = Cell::new(false);
229
230            let result = system
231                .run_virtual(async { completed.set(true) })
232                .now_or_never();
233
234            assert_eq!(result, Some(()));
235            assert!(completed.get());
236        }
237
238        #[test]
239        fn run_virtual_completes_normally_when_task_alternates_between_pending_and_ready() {
240            let (system, now) = virtual_system_with_current_time();
241            let progress = Rc::new(Cell::new(0));
242            let progress_2 = Rc::clone(&progress);
243            let mut future = pin!(system.run_virtual(async {
244                progress_2.set(1);
245                system.sleep(Duration::from_secs(1)).await;
246                progress_2.set(2);
247                system.sleep(Duration::from_secs(1)).await;
248                progress_2.set(3);
249            }));
250
251            let mut context = Context::from_waker(Waker::noop());
252            assert_eq!(future.as_mut().poll(&mut context), Pending);
253            assert_eq!(progress.get(), 1);
254
255            system
256                .inner
257                .state
258                .borrow_mut()
259                .advance_time(now + Duration::from_secs(1));
260            assert_eq!(future.as_mut().poll(&mut context), Pending);
261            assert_eq!(progress.get(), 2);
262
263            system
264                .inner
265                .state
266                .borrow_mut()
267                .advance_time(now + Duration::from_secs(2));
268            assert_eq!(future.as_mut().poll(&mut context), Ready(()));
269            assert_eq!(progress.get(), 3);
270        }
271
272        #[test]
273        fn run_virtual_waits_on_select_while_process_is_running_and_task_is_pending() {
274            let (system, now) = virtual_system_with_current_time();
275            let completed = Rc::new(Cell::new(false));
276            let completed_2 = Rc::clone(&completed);
277            let mut future = pin!(system.run_virtual(async {
278                system.sleep(Duration::from_secs(1)).await;
279                completed_2.set(true);
280            }));
281
282            let wake_flag = Arc::new(WakeFlag::new());
283            let waker = Waker::from(Arc::clone(&wake_flag));
284            let mut context = Context::from_waker(&waker);
285            assert_eq!(future.as_mut().poll(&mut context), Pending);
286            assert!(!completed.get());
287            assert!(!wake_flag.is_woken());
288
289            system
290                .inner
291                .state
292                .borrow_mut()
293                .advance_time(now + Duration::from_secs(1));
294            assert!(wake_flag.is_woken());
295
296            let wake_flag = Arc::new(WakeFlag::new());
297            let waker = Waker::from(Arc::clone(&wake_flag));
298            let mut context = Context::from_waker(&waker);
299            assert_eq!(future.as_mut().poll(&mut context), Ready(()));
300            assert!(completed.get());
301            assert!(!wake_flag.is_woken());
302        }
303
304        #[test]
305        fn run_virtual_yields_pending_to_caller_while_waiting_on_pending_select_in_running_state() {
306            let (system, _now) = virtual_system_with_current_time();
307            let completed = Rc::new(Cell::new(false));
308            let completed_2 = Rc::clone(&completed);
309            let mut future = pin!(system.run_virtual(async {
310                system.sleep(Duration::from_secs(1)).await;
311                completed_2.set(true);
312            }));
313
314            let mut context = Context::from_waker(Waker::noop());
315            assert_eq!(future.as_mut().poll(&mut context), Pending);
316            assert_eq!(future.as_mut().poll(&mut context), Pending);
317            assert!(!completed.get());
318        }
319
320        #[test]
321        fn run_virtual_aborts_task_when_process_is_already_terminated_before_entering_select() {
322            let system = Concurrent::new(VirtualSystem::new());
323            let dropped = Rc::new(Cell::new(false));
324            let dropped_2 = Rc::clone(&dropped);
325            let mut future = pin!(system.run_virtual(async {
326                let _drop_flag = DropFlag(dropped_2);
327                system.exit(ExitStatus(42)).await;
328            }));
329
330            let mut context = Context::from_waker(Waker::noop());
331            assert_eq!(future.as_mut().poll(&mut context), Ready(()));
332            assert!(dropped.get());
333        }
334
335        #[test]
336        fn run_virtual_blocks_while_stopped_before_select_and_resumes_task_when_process_is_continued()
337         {
338            let system = Concurrent::new(VirtualSystem::new());
339            let completed = Rc::new(Cell::new(false));
340            let mut future = pin!(system.run_virtual(async {
341                system.raise(SIGSTOP).await.unwrap();
342                completed.set(true);
343            }));
344
345            let mut context = Context::from_waker(Waker::noop());
346            assert_eq!(future.as_mut().poll(&mut context), Pending);
347            assert_eq!(
348                system.inner.current_process().state(),
349                ProcessState::stopped(SIGSTOP),
350            );
351            assert!(!completed.get());
352
353            _ = system.inner.current_process_mut().raise_signal(SIGCONT);
354            assert_eq!(future.as_mut().poll(&mut context), Ready(()));
355            assert!(completed.get());
356        }
357
358        #[test]
359        fn run_virtual_blocks_while_stopped_before_select_and_aborts_when_process_terminates() {
360            let system = Concurrent::new(VirtualSystem::new());
361            let dropped = Rc::new(Cell::new(false));
362            let dropped_2 = Rc::clone(&dropped);
363            let mut future = pin!(system.run_virtual(async {
364                let _drop_flag = DropFlag(dropped_2);
365                system.raise(SIGSTOP).await.unwrap();
366                unreachable!("task should be aborted while stopped");
367            }));
368
369            let mut context = Context::from_waker(Waker::noop());
370            assert_eq!(future.as_mut().poll(&mut context), Pending);
371            assert!(!dropped.get());
372
373            _ = system.inner.current_process_mut().raise_signal(SIGKILL);
374            assert_eq!(future.as_mut().poll(&mut context), Ready(()));
375            assert!(dropped.get());
376        }
377
378        #[test]
379        fn run_virtual_blocks_while_stopped_during_pending_select_and_continues_waiting_after_resume()
380         {
381            let (system, now) = virtual_system_with_current_time();
382            let completed = Rc::new(Cell::new(false));
383            let completed_2 = Rc::clone(&completed);
384            let mut future = pin!(system.run_virtual(async {
385                system.sleep(Duration::from_secs(1)).await;
386                completed_2.set(true);
387            }));
388
389            let mut context = Context::from_waker(Waker::noop());
390            assert_eq!(future.as_mut().poll(&mut context), Pending);
391
392            _ = system
393                .inner
394                .current_process_mut()
395                .set_state(ProcessState::stopped(SIGSTOP));
396            assert_eq!(future.as_mut().poll(&mut context), Pending);
397            assert!(!completed.get());
398
399            system
400                .inner
401                .state
402                .borrow_mut()
403                .advance_time(now + Duration::from_secs(1));
404            assert_eq!(future.as_mut().poll(&mut context), Pending);
405            assert!(!completed.get());
406
407            _ = system
408                .inner
409                .current_process_mut()
410                .set_state(ProcessState::Running);
411            assert_eq!(future.as_mut().poll(&mut context), Ready(()));
412            assert!(completed.get());
413        }
414
415        #[test]
416        fn run_virtual_blocks_while_stopped_during_pending_select_and_aborts_when_terminated() {
417            let (system, _now) = virtual_system_with_current_time();
418            let dropped = Rc::new(Cell::new(false));
419            let mut future = pin!(system.run_virtual(async {
420                let _drop_flag = DropFlag(Rc::clone(&dropped));
421                system.sleep(Duration::from_secs(1)).await;
422                unreachable!("task should be aborted while sleeping");
423            }));
424
425            let mut context = Context::from_waker(Waker::noop());
426            assert_eq!(future.as_mut().poll(&mut context), Pending);
427
428            _ = system
429                .inner
430                .current_process_mut()
431                .set_state(ProcessState::stopped(SIGSTOP));
432            assert_eq!(future.as_mut().poll(&mut context), Pending);
433            assert!(!dropped.get());
434
435            _ = system.inner.current_process_mut().raise_signal(SIGKILL);
436            assert_eq!(future.as_mut().poll(&mut context), Ready(()));
437            assert!(dropped.get());
438        }
439
440        #[test]
441        fn run_virtual_aborts_immediately_when_process_becomes_terminated_while_waiting_on_pending_select()
442         {
443            let (system, _now) = virtual_system_with_current_time();
444            let dropped = Rc::new(Cell::new(false));
445            let mut future = pin!(system.run_virtual(async {
446                let _drop_flag = DropFlag(Rc::clone(&dropped));
447                system.sleep(Duration::from_secs(1)).await;
448                unreachable!("task should be aborted while sleeping");
449            }));
450
451            let mut context = Context::from_waker(Waker::noop());
452            assert_eq!(future.as_mut().poll(&mut context), Pending);
453            assert!(!dropped.get());
454
455            _ = system.inner.current_process_mut().raise_signal(SIGKILL);
456            assert_eq!(future.as_mut().poll(&mut context), Ready(()));
457            assert!(dropped.get());
458        }
459    }
460}