Skip to main content

yash_env/system/
concurrency.rs

1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2026 WATANABE Yuki
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17//! Items for concurrent task execution
18
19use super::{CaughtSignals, Clock, Errno, Fcntl, Read, Result, Select, Write};
20use crate::io::Fd;
21use crate::waker::{ScheduledWakerQueue, WakerSet};
22use std::cell::{Cell, LazyCell, OnceCell, RefCell};
23use std::collections::HashMap;
24use std::future::poll_fn;
25use std::ops::{Deref, DerefMut};
26use std::pin::pin;
27use std::rc::{Rc, Weak};
28use std::task::Poll::{Pending, Ready};
29use std::task::{Context, Waker};
30use std::time::{Duration, Instant};
31
32/// Decorator for systems that makes blocking I/O operations concurrency-friendly
33///
34/// This struct is used as a wrapper for systems for enabling concurrent
35/// execution of multiple possibly blocking I/O tasks on a single thread. The
36/// inner system is expected to implement the [`Read`], [`Write`], and
37/// [`Select`] traits with synchronous (blocking) behavior. This struct leaves
38/// [`Future`]s returned by I/O methods pending until the I/O operation is ready
39/// to avoid blocking the entire process. This allows you to start multiple I/O
40/// tasks and wait for them to complete concurrently on a single thread. This
41/// struct also provides methods for waiting for signals and waiting for a
42/// specified duration, which are represented as [`Future`]s as well. The
43/// [`select`](Self::select) method of this struct consolidates blocking
44/// behavior into a single system call so that the process can resume execution
45/// as soon as any of the specified events occurs.
46///
47/// For system calls that do not block, such as [`Pipe`], the wrapper directly
48/// forwards the call to the inner system without any modification.
49///
50/// This struct is designed to be used in an [`Rc`] to allow multiple tasks to
51/// share the same concurrent system. Some traits, such as [`Read`] and
52/// [`Write`], are implemented for `Rc<Concurrent<S>>` instead of
53/// `Concurrent<S>` to allow the methods to return futures that capture a clone
54/// of the `Rc` and keep it alive until the operation is finished. This is
55/// necessary because the futures need to access the internal state of the
56/// `Concurrent` system without capturing a reference to the original
57/// `Concurrent` struct, which may not live long enough.
58///
59/// The following example illustrates how multiple concurrent tasks are run in a
60/// single-threaded pool:
61///
62/// ```
63/// # use std::rc::Rc;
64/// # use yash_env::system::{Concurrent, Pipe as _, Read as _, Write as _};
65/// # use yash_env::VirtualSystem;
66/// # use futures_util::task::LocalSpawnExt as _;
67/// let system = Rc::new(Concurrent::new(VirtualSystem::new()));
68/// let system2 = system.clone();
69/// let system3 = system.clone();
70/// let (reader, writer) = system.pipe().unwrap();
71/// let mut executor = futures_executor::LocalPool::new();
72///
73/// // We add a task that tries to read from the pipe, but nothing has been
74/// // written to it, so the task is stalled.
75/// let read_task = executor.spawner().spawn_local_with_handle(async move {
76///     let mut buffer = [0; 1];
77///     system.read(reader, &mut buffer).await.unwrap();
78///     buffer[0]
79/// });
80/// executor.run_until_stalled();
81///
82/// // Let's add a task that writes to the pipe.
83/// executor.spawner().spawn_local(async move {
84///     system2.write_all(writer, &[123]).await.unwrap();
85/// });
86/// executor.run_until_stalled();
87///
88/// // The write task has written a byte to the pipe, but the read task is still
89/// // stalled. We need to wake it up by calling `select` or `peek`.
90/// system3.peek();
91///
92/// // Now the read task can proceed to the end.
93/// let number = executor.run_until(read_task.unwrap());
94/// assert_eq!(number, 123);
95/// ```
96///
97/// [`Pipe`]: super::Pipe
98#[derive(Clone, Debug, Default)]
99pub struct Concurrent<S> {
100    inner: S,
101    state: RefCell<State>,
102}
103
104/// Internal state for `Concurrent` system
105#[derive(Clone, Debug, Default)]
106struct State {
107    /// Wakers for tasks waiting for read readiness on each file descriptor
108    reads: HashMap<Fd, WakerSet>,
109    /// Wakers for tasks waiting for write readiness on each file descriptor
110    writes: HashMap<Fd, WakerSet>,
111    /// Wakers for tasks waiting for a timeout to elapse
112    timeouts: ScheduledWakerQueue,
113
114    /// Wakers for tasks waiting for signals to be delivered
115    catches: WakerSet,
116    /// Shared placeholder for a list of next delivered signals
117    ///
118    /// Tasks waiting for signals must retain a strong reference to this list to
119    /// see the delivered signals when they are woken up. The list is allocated
120    /// when the first task starts waiting for signals and is shared among all
121    /// waiting tasks. The list is filled when signals are delivered.
122    signals: Weak<SignalList>,
123    /// Signal mask for the `select` method
124    ///
125    /// This cache is initialized from the signal mask the shell inherited from
126    /// the parent shell and then updated by [`Concurrent::sigmask`] for use by
127    /// `select`. In particular, signals the shell wants to catch are removed
128    /// from this mask so they can interrupt `select`. The value is `None` until
129    /// the signal mask is first updated by [`Concurrent::sigmask`].
130    select_mask: Option<Vec<crate::signal::Number>>,
131}
132
133impl<S> Concurrent<S> {
134    /// Creates a new `Concurrent` system that wraps the given inner system.
135    #[must_use]
136    pub fn new(inner: S) -> Self {
137        let state = Default::default();
138        Self { inner, state }
139    }
140}
141
142/// Reads from a file descriptor concurrently.
143///
144/// The `read` method internally uses [`Fcntl::get_and_set_nonblocking`] to
145/// temporarily set the file descriptor to non-blocking mode while performing
146/// the read operation. If the read operation would block (i.e., returns
147/// `EAGAIN` or `EWOULDBLOCK`), the method registers the current task's waker in
148/// the internal state so that it can be woken up by
149/// [`select`](Concurrent::select) when the file descriptor becomes ready for
150/// reading.
151impl<S> Read for Rc<Concurrent<S>>
152where
153    S: Fcntl + Read,
154{
155    fn read<'a>(
156        &self,
157        fd: Fd,
158        buffer: &'a mut [u8],
159    ) -> impl Future<Output = Result<usize>> + use<'a, S> {
160        let this = Rc::clone(self);
161        async move {
162            let this = TemporaryNonBlockingGuard::new(&this, fd);
163            let waker = LazyCell::default();
164            loop {
165                match this.inner.read(fd, buffer).await {
166                    // EWOULDBLOCK is unreachable if it has the same value as EAGAIN.
167                    #[allow(unreachable_patterns)]
168                    Err(Errno::EAGAIN | Errno::EWOULDBLOCK | Errno::EINTR) => {
169                        this.yield_for_read(fd, &waker).await
170                    }
171
172                    result => return result,
173                }
174            }
175        }
176    }
177}
178
179/// Writes to a file descriptor concurrently.
180///
181/// The `write` method internally uses [`Fcntl::get_and_set_nonblocking`] to
182/// temporarily set the file descriptor to non-blocking mode while performing
183/// the write operation. If the write operation would block (i.e., returns
184/// `EAGAIN` or `EWOULDBLOCK`), the method registers the current task's waker in
185/// the internal state so that it can be woken up by
186/// [`select`](Concurrent::select) when the file descriptor becomes ready for
187/// writing.
188impl<S> Write for Rc<Concurrent<S>>
189where
190    S: Fcntl + Write,
191{
192    fn write<'a>(
193        &self,
194        fd: Fd,
195        buffer: &'a [u8],
196    ) -> impl Future<Output = Result<usize>> + use<'a, S> {
197        let this = Rc::clone(self);
198        async move {
199            let this = TemporaryNonBlockingGuard::new(&this, fd);
200            let waker = LazyCell::default();
201            loop {
202                match this.inner.write(fd, buffer).await {
203                    // EWOULDBLOCK is unreachable if it has the same value as EAGAIN.
204                    #[allow(unreachable_patterns)]
205                    Err(Errno::EAGAIN | Errno::EWOULDBLOCK | Errno::EINTR) => {
206                        this.yield_for_write(fd, &waker).await
207                    }
208
209                    result => return result,
210                }
211            }
212        }
213    }
214}
215
216impl<S> Concurrent<S> {
217    async fn yield_for_read<F>(&self, fd: Fd, waker: &LazyCell<Rc<Cell<Option<Waker>>>, F>)
218    where
219        F: FnOnce() -> Rc<Cell<Option<Waker>>>,
220    {
221        self.yield_once(fd, waker, |state| &mut state.reads).await
222    }
223
224    async fn yield_for_write<F>(&self, fd: Fd, waker: &LazyCell<Rc<Cell<Option<Waker>>>, F>)
225    where
226        F: FnOnce() -> Rc<Cell<Option<Waker>>>,
227    {
228        self.yield_once(fd, waker, |state| &mut state.writes).await
229    }
230
231    /// Helper method for yielding the current task and registering its waker
232    /// for the specified file descriptor and event type (read or write)
233    async fn yield_once<F, G>(
234        &self,
235        fd: Fd,
236        waker: &LazyCell<Rc<Cell<Option<Waker>>>, F>,
237        target: G,
238    ) where
239        F: FnOnce() -> Rc<Cell<Option<Waker>>>,
240        G: Fn(&mut State) -> &mut HashMap<Fd, WakerSet>,
241    {
242        let mut first_time = true;
243        poll_fn(|context| {
244            if first_time {
245                first_time = false;
246                waker.set(Some(context.waker().clone()));
247                target(&mut self.state.borrow_mut())
248                    .entry(fd)
249                    .or_default()
250                    .insert(Rc::downgrade(waker));
251                Pending
252            } else {
253                Ready(())
254            }
255        })
256        .await
257    }
258}
259
260impl<S> Concurrent<S>
261where
262    S: Clock,
263{
264    /// Waits until the specified deadline.
265    ///
266    /// The returned future will be pending until the specified deadline is
267    /// reached, at which point it will complete.
268    pub async fn sleep_until(&self, deadline: Instant) {
269        let waker: LazyCell<Rc<Cell<Option<Waker>>>> = LazyCell::default();
270        poll_fn(|context| {
271            if self.inner.now() >= deadline {
272                Ready(())
273            } else {
274                waker.set(Some(context.waker().clone()));
275                self.state
276                    .borrow_mut()
277                    .timeouts
278                    .push(deadline, Rc::downgrade(&waker));
279                Pending
280            }
281        })
282        .await
283    }
284
285    /// Waits for the specified duration to elapse.
286    ///
287    /// The returned future will be pending until the specified duration has
288    /// elapsed, at which point it will complete.
289    pub async fn sleep(&self, duration: Duration) {
290        let now = self.inner.now();
291        let deadline = now + duration;
292        self.sleep_until(deadline).await;
293    }
294}
295
296impl<S> Concurrent<S> {
297    /// Waits for signals to be caught.
298    ///
299    /// The returned future will be pending until any signal is caught, at which
300    /// point it will complete with a list of caught signals. The list is shared
301    /// among all tasks waiting for signals, so that they can see the same list
302    /// of caught signals when they are woken up.
303    ///
304    /// Before calling this method, the caller needs to [`set_disposition`] for
305    /// the signals it wants to catch.
306    ///
307    /// If this `Concurrent` system is used in an `Env`, you should call
308    /// [`Env::wait_for_signals`](crate::Env::wait_for_signals) instead of this
309    /// method, so that the trap set can handle the signals properly.
310    ///
311    /// [`set_disposition`]: crate::trap::SignalSystem::set_disposition
312    pub async fn wait_for_signals(&self) -> Rc<SignalList> {
313        let signals = {
314            let mut state = self.state.borrow_mut();
315            state.signals.upgrade().unwrap_or_else(|| {
316                let signals = Rc::new(SignalList::new());
317                state.signals = Rc::downgrade(&signals);
318                signals
319            })
320        };
321
322        let waker: LazyCell<Rc<Cell<Option<Waker>>>> = LazyCell::default();
323        poll_fn(|context| {
324            if signals.0.get().is_some() {
325                Ready(())
326            } else {
327                waker.set(Some(context.waker().clone()));
328                self.state
329                    .borrow_mut()
330                    .catches
331                    .insert(Rc::downgrade(&waker));
332                Pending
333            }
334        })
335        .await;
336
337        signals
338    }
339}
340
341impl<S> Concurrent<S>
342where
343    S: CaughtSignals + Clock + Select,
344{
345    /// Peeks for any ready events without blocking.
346    ///
347    /// This method performs a `select` system call with the file descriptors
348    /// and timeout of pending tasks, and wakes the tasks whose events are
349    /// ready. This method is similar to [`select`](Concurrent::select), but it
350    /// does not block and returns immediately.
351    pub fn peek(&self) {
352        let select = pin!(self.select_impl(true));
353        let poll = select.poll(&mut Context::from_waker(Waker::noop()));
354        debug_assert_eq!(poll, Ready(()), "peek should not block");
355    }
356
357    /// Waits for any of pending tasks to become ready.
358    ///
359    /// This method performs a `select` system call with the file descriptors
360    /// and timeout of pending tasks, and wakes the tasks whose events are
361    /// ready. This method should be called in the main loop of the process to
362    /// ensure that tasks can make progress. In a typical use case, the main
363    /// loop would look like this:
364    ///
365    /// ```ignore
366    /// loop {
367    ///     // Run ready tasks until they yield again
368    ///     run_ready_tasks();
369    ///     // Wait for any pending task to become ready
370    ///     concurrent.select().await;
371    /// }
372    /// ```
373    ///
374    /// The [`run_real`](Self::run_real) and [`run_virtual`](Self::run_virtual)
375    /// methods provide a convenient way to implement such a main loop.
376    ///
377    /// The future returned by this method will be pending if and only if the
378    /// future returned by the inner system's [`select`](Select::select) method
379    /// is pending.
380    pub async fn select(&self) {
381        self.select_impl(false).await;
382    }
383
384    #[allow(clippy::await_holding_refcell_ref)]
385    async fn select_impl(&self, peek: bool) {
386        // In this method, we keep the borrow of `state` across the `await` point. This is
387        // intentional because the real `select` call blocks the entire process, so there cannot
388        // be any other task that modifies the state while we are waiting for the `select` call to
389        // return.
390        let mut state = self.state.borrow_mut();
391
392        // Prepare parameters for the `select` call based on the current state
393        let mut readers = state.reads.keys().cloned().collect();
394        let mut writers = state.writes.keys().cloned().collect();
395        let timeout = if peek {
396            Some(Duration::ZERO)
397        } else {
398            state
399                .timeouts
400                .next_wake_time()
401                .map(|target| target.saturating_duration_since(self.inner.now()))
402        };
403        let signal_mask = (state.signals.strong_count() > 0)
404            .then(|| state.select_mask.as_deref())
405            .flatten();
406
407        // Perform the `select` call
408        let result = self
409            .inner
410            .select(&mut readers, &mut writers, timeout, signal_mask)
411            .await;
412
413        // Wake eligible tasks
414        if result != Err(Errno::EINTR) {
415            // If `select` succeeded, `readers` and `writers` contain the lists of ready FDs. In
416            // case of error, `select` leaves the input lists unmodified (which is required by
417            // POSIX), but we don't know which FD caused the error, so we conservatively wake all
418            // tasks waiting for any FD.
419            wake_tasks_for_ready_fds(&mut state.reads, &readers);
420            wake_tasks_for_ready_fds(&mut state.writes, &writers);
421        }
422        if !state.timeouts.is_empty() {
423            state.timeouts.wake(self.inner.now());
424        }
425        if let Some(signal_list) = state.signals.upgrade() {
426            // If the upgrade succeeds, it means there are tasks waiting for signals, so let's
427            // check if we have caught any signals.
428            let signals = self.inner.caught_signals();
429            if !signals.is_empty() {
430                let set_result = signal_list.0.set(signals);
431                debug_assert_eq!(set_result, Ok(()), "SignalList should not be filled yet");
432                state.catches.wake_all();
433                // Drop the list so that the next wait_for_signals call can create a new one.
434                state.signals = Weak::new();
435            }
436        }
437    }
438}
439
440fn wake_tasks_for_ready_fds(task_map: &mut HashMap<Fd, WakerSet>, ready_fds: &[Fd]) {
441    task_map.retain(|fd, wakers| {
442        if ready_fds.contains(fd) {
443            wakers.wake_all();
444            false
445        } else {
446            true
447        }
448    })
449}
450
451/// Guard for temporarily setting a file descriptor to non-blocking mode and
452/// restoring the original blocking mode when dropped
453#[derive(Debug)]
454struct TemporaryNonBlockingGuard<'a, S: Fcntl> {
455    system: &'a Concurrent<S>,
456    fd: Fd,
457    original_nonblocking: bool,
458}
459
460impl<'a, S: Fcntl> TemporaryNonBlockingGuard<'a, S> {
461    fn new(system: &'a Concurrent<S>, fd: Fd) -> Self {
462        Self {
463            system,
464            fd,
465            original_nonblocking: system.inner.get_and_set_nonblocking(fd, true) == Ok(true),
466        }
467    }
468}
469
470impl<'a, S: Fcntl> Drop for TemporaryNonBlockingGuard<'a, S> {
471    fn drop(&mut self) {
472        if !self.original_nonblocking {
473            self.system
474                .inner
475                .get_and_set_nonblocking(self.fd, false)
476                .ok();
477        }
478    }
479}
480
481impl<'a, S: Fcntl> Deref for TemporaryNonBlockingGuard<'a, S> {
482    type Target = Concurrent<S>;
483
484    fn deref(&self) -> &Self::Target {
485        self.system
486    }
487}
488
489/// List of received signals
490///
491/// This struct is returned by the [`Concurrent::wait_for_signals`] method to
492/// represent the list of signals that have been caught. This is a simple
493/// wrapper around `Vec<crate::signal::Number>` that is accessible through
494/// `Deref` and `DerefMut`.
495#[derive(Clone, Debug, Eq, PartialEq)]
496pub struct SignalList(OnceCell<Vec<crate::signal::Number>>);
497
498impl Deref for SignalList {
499    type Target = Vec<crate::signal::Number>;
500
501    fn deref(&self) -> &Vec<crate::signal::Number> {
502        // `unwrap` is safe because the list is initialized before being made available to the user.
503        self.0.get().unwrap()
504    }
505}
506
507impl DerefMut for SignalList {
508    fn deref_mut(&mut self) -> &mut Vec<crate::signal::Number> {
509        // `unwrap` is safe because the list is initialized before being made available to the user.
510        self.0.get_mut().unwrap()
511    }
512}
513
514impl SignalList {
515    #[must_use]
516    fn new() -> Self {
517        Self(OnceCell::new())
518    }
519
520    /// Consumes the `SignalList` and returns the inner list of signals.
521    pub fn into_vec(self) -> Vec<crate::signal::Number> {
522        // `unwrap` is safe because the list is initialized before being made available to the user.
523        self.0.into_inner().unwrap()
524    }
525}
526
527mod delegates;
528mod run;
529mod rw_all;
530mod signal;
531
532#[cfg(test)]
533mod tests {
534    use super::super::{
535        Close as _, Disposition, Mode, OfdAccess, Open as _, OpenFlag, Pipe as _, SendSignal,
536    };
537    use super::*;
538    use crate::system::r#virtual::{PIPE_SIZE, SIGCHLD, SIGINT, SIGUSR2, VirtualSystem};
539    use crate::test_helper::WakeFlag;
540    use crate::trap::SignalSystem as _;
541    use assert_matches::assert_matches;
542    use futures_util::FutureExt as _;
543    use std::sync::Arc;
544    use std::task::Poll::{Pending, Ready};
545
546    #[test]
547    fn peek_with_no_conditions_returns_immediately() {
548        let system = Concurrent::new(VirtualSystem::new());
549        system.peek();
550    }
551
552    #[test]
553    fn select_with_no_conditions_never_completes() {
554        let system = Concurrent::new(VirtualSystem::new());
555
556        let future = pin!(system.select());
557
558        let wake_flag = Arc::new(WakeFlag::new());
559        let waker = Waker::from(wake_flag.clone());
560        let mut context = Context::from_waker(&waker);
561        assert_eq!(future.poll(&mut context), Pending);
562        assert!(!wake_flag.is_woken());
563    }
564
565    #[test]
566    fn regular_file_read_completes_immediately() {
567        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
568        let fd = system
569            .open(
570                c"/foo",
571                OfdAccess::ReadOnly,
572                OpenFlag::Create.into(),
573                Mode::empty(),
574            )
575            .now_or_never()
576            .unwrap()
577            .unwrap();
578
579        let mut buffer = [0; 4];
580        let future = pin!(system.read(fd, &mut buffer));
581
582        let wake_flag = Arc::new(WakeFlag::new());
583        let waker = Waker::from(wake_flag.clone());
584        let mut context = Context::from_waker(&waker);
585        assert_eq!(future.poll(&mut context), Ready(Ok(0)));
586        assert!(!wake_flag.is_woken());
587    }
588
589    #[test]
590    fn pipe_read_becomes_ready_on_data_available() {
591        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
592        let (read_fd, write_fd) = system.pipe().unwrap();
593
594        let mut buffer1 = [0; 4];
595        let mut buffer2 = [0; 4];
596        let mut read1 = pin!(system.read(read_fd, &mut buffer1));
597        let mut read2 = pin!(system.read(read_fd, &mut buffer2));
598
599        let wake_flag1 = Arc::new(WakeFlag::new());
600        let wake_flag2 = Arc::new(WakeFlag::new());
601        let waker1 = Waker::from(wake_flag1.clone());
602        let waker2 = Waker::from(wake_flag2.clone());
603        let mut context1 = Context::from_waker(&waker1);
604        let mut context2 = Context::from_waker(&waker2);
605        assert_eq!(read1.as_mut().poll(&mut context1), Pending);
606        assert_eq!(read2.as_mut().poll(&mut context2), Pending);
607
608        let mut select = pin!(system.select());
609        let mut context3 = Context::from_waker(Waker::noop());
610        assert_eq!(select.as_mut().poll(&mut context3), Pending);
611        assert!(!wake_flag1.is_woken());
612        assert!(!wake_flag2.is_woken());
613
614        // Write data to the pipe to make the reads ready
615        system
616            .write(write_fd, &[1, 2, 3, 4])
617            .now_or_never()
618            .unwrap()
619            .unwrap();
620        assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
621        assert!(wake_flag1.is_woken());
622        assert!(wake_flag2.is_woken());
623    }
624
625    #[test]
626    fn select_wakes_only_read_tasks_with_ready_fd() {
627        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
628        let (read_fd1, write_fd1) = system.pipe().unwrap();
629        let (read_fd2, _write_fd2) = system.pipe().unwrap();
630
631        let mut buffer1 = [0; 4];
632        let mut buffer2 = [0; 4];
633        let mut read1 = pin!(system.read(read_fd1, &mut buffer1));
634        let mut read2 = pin!(system.read(read_fd2, &mut buffer2));
635
636        let wake_flag1 = Arc::new(WakeFlag::new());
637        let wake_flag2 = Arc::new(WakeFlag::new());
638        let waker1 = Waker::from(wake_flag1.clone());
639        let waker2 = Waker::from(wake_flag2.clone());
640        let mut context1 = Context::from_waker(&waker1);
641        let mut context2 = Context::from_waker(&waker2);
642        assert_eq!(read1.as_mut().poll(&mut context1), Pending);
643        assert_eq!(read2.as_mut().poll(&mut context2), Pending);
644
645        let mut select = pin!(system.select());
646        let mut context3 = Context::from_waker(Waker::noop());
647        assert_eq!(select.as_mut().poll(&mut context3), Pending);
648        assert!(!wake_flag1.is_woken());
649        assert!(!wake_flag2.is_woken());
650
651        // Write data to the first pipe to make the first read ready
652        system
653            .write(write_fd1, &[1, 2, 3, 4])
654            .now_or_never()
655            .unwrap()
656            .unwrap();
657        assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
658        assert!(wake_flag1.is_woken());
659        assert!(!wake_flag2.is_woken());
660    }
661
662    #[test]
663    fn read_preserves_fd_blocking_mode() {
664        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
665        let fd = system
666            .open(
667                c"/foo",
668                OfdAccess::ReadOnly,
669                OpenFlag::Create.into(),
670                Mode::empty(),
671            )
672            .now_or_never()
673            .unwrap()
674            .unwrap();
675
676        let mut buffer = [0; 4];
677        system
678            .read(fd, &mut buffer)
679            .now_or_never()
680            .unwrap()
681            .unwrap();
682
683        // The file descriptor should have the same blocking mode as before
684        // (which is blocking by default)
685        assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
686
687        system.inner.get_and_set_nonblocking(fd, true).ok();
688        system
689            .read(fd, &mut buffer)
690            .now_or_never()
691            .unwrap()
692            .unwrap();
693        // The file descriptor should have the same blocking mode as before
694        // (which was set to non-blocking before the read)
695        assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
696    }
697
698    #[test]
699    fn regular_file_write_completes_immediately() {
700        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
701        let fd = system
702            .open(
703                c"/foo",
704                OfdAccess::WriteOnly,
705                OpenFlag::Create.into(),
706                Mode::empty(),
707            )
708            .now_or_never()
709            .unwrap()
710            .unwrap();
711
712        let buffer = [1, 2, 3, 4];
713        let future = pin!(system.write(fd, &buffer));
714
715        let wake_flag = Arc::new(WakeFlag::new());
716        let waker = Waker::from(wake_flag.clone());
717        let mut context = Context::from_waker(&waker);
718        assert_eq!(future.poll(&mut context), Ready(Ok(4)));
719        assert!(!wake_flag.is_woken());
720    }
721
722    #[test]
723    fn pipe_write_becomes_ready_on_buffer_space() {
724        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
725        let (read_fd, write_fd) = system.pipe().unwrap();
726        // Fill the pipe buffer to make the next write pending
727        system
728            .write(write_fd, &[0; PIPE_SIZE])
729            .now_or_never()
730            .unwrap()
731            .unwrap();
732
733        let buffer1 = [1, 2, 3, 4];
734        let buffer2 = [5, 6, 7, 8];
735        let mut write1 = pin!(system.write(write_fd, &buffer1));
736        let mut write2 = pin!(system.write(write_fd, &buffer2));
737
738        let wake_flag1 = Arc::new(WakeFlag::new());
739        let wake_flag2 = Arc::new(WakeFlag::new());
740        let waker1 = Waker::from(wake_flag1.clone());
741        let waker2 = Waker::from(wake_flag2.clone());
742        let mut context1 = Context::from_waker(&waker1);
743        let mut context2 = Context::from_waker(&waker2);
744        assert_eq!(write1.as_mut().poll(&mut context1), Pending);
745        assert_eq!(write2.as_mut().poll(&mut context2), Pending);
746
747        let mut select = pin!(system.select());
748        let mut context3 = Context::from_waker(Waker::noop());
749        assert_eq!(select.as_mut().poll(&mut context3), Pending);
750        assert!(!wake_flag1.is_woken());
751        assert!(!wake_flag2.is_woken());
752
753        // Make space in the pipe buffer to make the writes ready
754        let mut read_buffer = [0; PIPE_SIZE];
755        system
756            .read(read_fd, &mut read_buffer)
757            .now_or_never()
758            .unwrap()
759            .unwrap();
760        assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
761        assert!(wake_flag1.is_woken());
762        assert!(wake_flag2.is_woken());
763    }
764
765    #[test]
766    fn select_wakes_only_write_tasks_with_ready_fd() {
767        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
768        let (read_fd1, write_fd1) = system.pipe().unwrap();
769        let (_read_fd2, write_fd2) = system.pipe().unwrap();
770        // Fill the pipe buffers to make the next writes pending
771        system
772            .write(write_fd1, &[0; PIPE_SIZE])
773            .now_or_never()
774            .unwrap()
775            .unwrap();
776        system
777            .write(write_fd2, &[0; PIPE_SIZE])
778            .now_or_never()
779            .unwrap()
780            .unwrap();
781
782        let buffer1 = [1, 2, 3, 4];
783        let buffer2 = [5, 6, 7, 8];
784        let mut write1 = pin!(system.write(write_fd1, &buffer1));
785        let mut write2 = pin!(system.write(write_fd2, &buffer2));
786
787        let wake_flag1 = Arc::new(WakeFlag::new());
788        let wake_flag2 = Arc::new(WakeFlag::new());
789        let waker1 = Waker::from(wake_flag1.clone());
790        let waker2 = Waker::from(wake_flag2.clone());
791        let mut context1 = Context::from_waker(&waker1);
792        let mut context2 = Context::from_waker(&waker2);
793        assert_eq!(write1.as_mut().poll(&mut context1), Pending);
794        assert_eq!(write2.as_mut().poll(&mut context2), Pending);
795
796        let mut select = pin!(system.select());
797        let mut context3 = Context::from_waker(Waker::noop());
798        assert_eq!(select.as_mut().poll(&mut context3), Pending);
799        assert!(!wake_flag1.is_woken());
800        assert!(!wake_flag2.is_woken());
801
802        // Make space in the pipe buffer to make the write ready
803        let mut read_buffer = [0; PIPE_SIZE];
804        system
805            .read(read_fd1, &mut read_buffer)
806            .now_or_never()
807            .unwrap()
808            .unwrap();
809        assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
810        assert!(wake_flag1.is_woken());
811        assert!(!wake_flag2.is_woken());
812    }
813
814    #[test]
815    fn write_preserves_fd_blocking_mode() {
816        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
817        let fd = system
818            .open(
819                c"/foo",
820                OfdAccess::WriteOnly,
821                OpenFlag::Create.into(),
822                Mode::empty(),
823            )
824            .now_or_never()
825            .unwrap()
826            .unwrap();
827
828        let buffer = [1, 2, 3, 4];
829        system.write(fd, &buffer).now_or_never().unwrap().unwrap();
830
831        // The file descriptor should have the same blocking mode as before
832        // (which is blocking by default)
833        assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
834
835        system.inner.get_and_set_nonblocking(fd, true).ok();
836        system.write(fd, &buffer).now_or_never().unwrap().unwrap();
837        // The file descriptor should have the same blocking mode as before
838        // (which was set to non-blocking before the write)
839        assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
840    }
841
842    #[test]
843    fn sleep_completes_after_duration() {
844        let system = VirtualSystem::new();
845        let state = system.state.clone();
846        let now = Instant::now();
847        state.borrow_mut().now = Some(now);
848        let system = Concurrent::new(system);
849
850        let mut sleep = pin!(system.sleep(Duration::from_secs(1)));
851
852        let wake_flag = Arc::new(WakeFlag::new());
853        let waker = Waker::from(wake_flag.clone());
854        let mut context = Context::from_waker(&waker);
855        assert_eq!(sleep.as_mut().poll(&mut context), Pending);
856
857        let mut select = pin!(system.select());
858        assert_eq!(select.as_mut().poll(&mut context), Pending);
859        assert!(!wake_flag.is_woken());
860
861        // Advance time by 1 second to make the sleep ready
862        state
863            .borrow_mut()
864            .advance_time(now + Duration::from_secs(1));
865        assert_eq!(select.as_mut().poll(&mut context), Ready(()));
866        assert!(wake_flag.is_woken());
867
868        let wake_flag = Arc::new(WakeFlag::new());
869        let waker = Waker::from(wake_flag.clone());
870        let mut context = Context::from_waker(&waker);
871        assert_eq!(sleep.poll(&mut context), Ready(()));
872        assert!(!wake_flag.is_woken());
873    }
874
875    #[test]
876    fn signal_wait_completes_on_signal() {
877        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
878        system
879            .set_disposition(SIGINT, Disposition::Catch)
880            .now_or_never()
881            .unwrap()
882            .unwrap();
883        system
884            .set_disposition(SIGCHLD, Disposition::Catch)
885            .now_or_never()
886            .unwrap()
887            .unwrap();
888        system
889            .set_disposition(SIGUSR2, Disposition::Catch)
890            .now_or_never()
891            .unwrap()
892            .unwrap();
893
894        let mut wait = pin!(system.wait_for_signals());
895
896        let wake_flag = Arc::new(WakeFlag::new());
897        let waker = Waker::from(wake_flag.clone());
898        let mut context = Context::from_waker(&waker);
899        assert_eq!(wait.as_mut().poll(&mut context), Pending);
900
901        let mut select = pin!(system.select());
902        let mut null_context = Context::from_waker(Waker::noop());
903        assert_eq!(select.as_mut().poll(&mut null_context), Pending);
904        assert!(!wake_flag.is_woken());
905
906        // Send signals to make the wait ready
907        system.raise(SIGINT).now_or_never().unwrap().unwrap();
908        system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
909        assert_eq!(select.as_mut().poll(&mut null_context), Ready(()));
910        assert!(wake_flag.is_woken());
911
912        let wake_flag = Arc::new(WakeFlag::new());
913        let waker = Waker::from(wake_flag.clone());
914        let mut context = Context::from_waker(&waker);
915        assert_matches!(wait.poll(&mut context), Ready(signals) => {
916            assert_matches!(***signals, [SIGINT, SIGCHLD] | [SIGCHLD, SIGINT]);
917        });
918    }
919
920    #[test]
921    fn select_does_not_consume_caught_signals_until_tasks_are_waiting_for_signals() {
922        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
923        let (read_fd, write_fd) = system.pipe().unwrap();
924        system
925            .set_disposition(SIGCHLD, Disposition::Catch)
926            .now_or_never()
927            .unwrap()
928            .unwrap();
929        system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
930
931        let mut buffer = [0; 4];
932        let mut read = pin!(system.read(read_fd, &mut buffer));
933
934        let mut null_context = Context::from_waker(Waker::noop());
935        assert_eq!(read.as_mut().poll(&mut null_context), Pending);
936
937        system
938            .write(write_fd, b"foo")
939            .now_or_never()
940            .unwrap()
941            .unwrap();
942        system.select().now_or_never().unwrap();
943
944        let mut wait = pin!(system.wait_for_signals());
945        assert_eq!(wait.as_mut().poll(&mut null_context), Pending);
946
947        system.select().now_or_never().unwrap();
948        assert_matches!(wait.poll(&mut null_context), Ready(signals) => {
949            assert_eq!(**signals, &[SIGCHLD]);
950        });
951    }
952
953    #[test]
954    fn wait_for_signals_can_be_used_many_times() {
955        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
956        system
957            .set_disposition(SIGINT, Disposition::Catch)
958            .now_or_never()
959            .unwrap()
960            .unwrap();
961        system
962            .set_disposition(SIGCHLD, Disposition::Catch)
963            .now_or_never()
964            .unwrap()
965            .unwrap();
966
967        let mut wait1 = pin!(system.wait_for_signals());
968        let mut null_context = Context::from_waker(Waker::noop());
969        assert_eq!(wait1.as_mut().poll(&mut null_context), Pending);
970
971        system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
972        system.select().now_or_never().unwrap();
973
974        let mut wait2 = pin!(system.wait_for_signals());
975        assert_eq!(wait2.as_mut().poll(&mut null_context), Pending);
976
977        system.raise(SIGINT).now_or_never().unwrap().unwrap();
978        system.select().now_or_never().unwrap();
979
980        assert_matches!(wait1.poll(&mut null_context), Ready(signals) => {
981            assert_eq!(**signals, &[SIGCHLD]);
982        });
983        assert_matches!(wait2.poll(&mut null_context), Ready(signals) => {
984            assert_eq!(**signals, &[SIGINT]);
985        });
986    }
987
988    #[test]
989    fn select_completes_when_any_condition_is_ready() {
990        let system = VirtualSystem::new();
991        let state = system.state.clone();
992        let now = Instant::now();
993        state.borrow_mut().now = Some(now);
994        let system = Rc::new(Concurrent::new(system));
995        let (read_fd, write_fd) = system.pipe().unwrap();
996        let mut buffer = [0; 4];
997        system
998            .set_disposition(SIGINT, Disposition::Catch)
999            .now_or_never()
1000            .unwrap()
1001            .unwrap();
1002
1003        let mut sleep = pin!(system.sleep(Duration::from_secs(3)));
1004        let mut read = pin!(system.read(read_fd, &mut buffer));
1005        let mut wait = pin!(system.wait_for_signals());
1006
1007        let wake_sleep = Arc::new(WakeFlag::new());
1008        let wake_read = Arc::new(WakeFlag::new());
1009        let wake_wait = Arc::new(WakeFlag::new());
1010        let sleep_waker = Waker::from(wake_sleep.clone());
1011        let read_waker = Waker::from(wake_read.clone());
1012        let wait_waker = Waker::from(wake_wait.clone());
1013        let mut sleep_context = Context::from_waker(&sleep_waker);
1014        let mut read_context = Context::from_waker(&read_waker);
1015        let mut wait_context = Context::from_waker(&wait_waker);
1016        assert_eq!(sleep.as_mut().poll(&mut sleep_context), Pending);
1017        assert_eq!(read.as_mut().poll(&mut read_context), Pending);
1018        assert_eq!(wait.as_mut().poll(&mut wait_context), Pending);
1019
1020        let mut select = pin!(system.select());
1021
1022        let wake_select = Arc::new(WakeFlag::new());
1023        let select_waker = Waker::from(wake_select.clone());
1024        let mut select_context = Context::from_waker(&select_waker);
1025        assert_eq!(select.as_mut().poll(&mut select_context), Pending);
1026        assert!(!wake_sleep.is_woken());
1027        assert!(!wake_read.is_woken());
1028        assert!(!wake_wait.is_woken());
1029        assert!(!wake_select.is_woken());
1030
1031        system
1032            .write(write_fd, b"foo")
1033            .now_or_never()
1034            .unwrap()
1035            .unwrap();
1036        assert!(wake_select.is_woken());
1037
1038        let wake_select = Arc::new(WakeFlag::new());
1039        let select_waker = Waker::from(wake_select.clone());
1040        let mut select_context = Context::from_waker(&select_waker);
1041        assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
1042        assert!(!wake_sleep.is_woken());
1043        assert!(wake_read.is_woken());
1044        assert!(!wake_wait.is_woken());
1045        assert!(!wake_select.is_woken());
1046
1047        assert_eq!(read.now_or_never().unwrap(), Ok(3));
1048
1049        let mut select = pin!(system.select());
1050
1051        let wake_select = Arc::new(WakeFlag::new());
1052        let select_waker = Waker::from(wake_select.clone());
1053        let mut select_context = Context::from_waker(&select_waker);
1054        assert_eq!(select.as_mut().poll(&mut select_context), Pending);
1055        assert!(!wake_sleep.is_woken());
1056        assert!(!wake_wait.is_woken());
1057        assert!(!wake_select.is_woken());
1058
1059        state
1060            .borrow_mut()
1061            .advance_time(now + Duration::from_secs(3));
1062        assert!(wake_select.is_woken());
1063
1064        let wake_select = Arc::new(WakeFlag::new());
1065        let select_waker = Waker::from(wake_select.clone());
1066        let mut select_context = Context::from_waker(&select_waker);
1067        assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
1068        assert!(wake_sleep.is_woken());
1069        assert!(!wake_wait.is_woken());
1070        assert!(!wake_select.is_woken());
1071
1072        sleep.now_or_never().unwrap();
1073
1074        let mut select = pin!(system.select());
1075
1076        let wake_select = Arc::new(WakeFlag::new());
1077        let select_waker = Waker::from(wake_select.clone());
1078        let mut select_context = Context::from_waker(&select_waker);
1079        assert_eq!(select.as_mut().poll(&mut select_context), Pending);
1080        assert!(!wake_wait.is_woken());
1081        assert!(!wake_select.is_woken());
1082
1083        system.raise(SIGINT).now_or_never().unwrap().unwrap();
1084        assert!(wake_select.is_woken());
1085
1086        let wake_select = Arc::new(WakeFlag::new());
1087        let select_waker = Waker::from(wake_select.clone());
1088        let mut select_context = Context::from_waker(&select_waker);
1089        assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
1090        assert!(wake_wait.is_woken());
1091        assert!(!wake_select.is_woken());
1092
1093        assert_eq!(**wait.now_or_never().unwrap(), &[SIGINT]);
1094    }
1095
1096    #[test]
1097    fn select_wakes_all_reads_and_writes_on_ebadf() {
1098        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
1099        let (read_fd1, _write_fd1) = system.pipe().unwrap();
1100        let (_read_fd2, write_fd2) = system.pipe().unwrap();
1101        // Fill the pipe buffer to make the next write pending
1102        system
1103            .write(write_fd2, &[0; PIPE_SIZE])
1104            .now_or_never()
1105            .unwrap()
1106            .unwrap();
1107
1108        let mut read_buffer = [0; 4];
1109        let mut read = pin!(system.read(read_fd1, &mut read_buffer));
1110        let mut write = pin!(system.write(write_fd2, &[1, 2, 3, 4]));
1111
1112        let wake_flag1 = Arc::new(WakeFlag::new());
1113        let wake_flag2 = Arc::new(WakeFlag::new());
1114        let waker1 = Waker::from(wake_flag1.clone());
1115        let waker2 = Waker::from(wake_flag2.clone());
1116        let mut context1 = Context::from_waker(&waker1);
1117        let mut context2 = Context::from_waker(&waker2);
1118        assert_eq!(read.as_mut().poll(&mut context1), Pending);
1119        assert_eq!(write.as_mut().poll(&mut context2), Pending);
1120
1121        let mut select = pin!(system.select());
1122
1123        let wake_select = Arc::new(WakeFlag::new());
1124        let select_waker = Waker::from(wake_select.clone());
1125        let mut select_context = Context::from_waker(&select_waker);
1126        assert_eq!(select.as_mut().poll(&mut select_context), Pending);
1127        assert!(!wake_flag1.is_woken());
1128        assert!(!wake_flag2.is_woken());
1129        assert!(!wake_select.is_woken());
1130
1131        // Close the file descriptor to make the select call return EBADF
1132        system.close(read_fd1).unwrap();
1133        assert!(wake_select.is_woken());
1134
1135        let wake_select = Arc::new(WakeFlag::new());
1136        let select_waker = Waker::from(wake_select.clone());
1137        let mut select_context = Context::from_waker(&select_waker);
1138        assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
1139        assert!(wake_flag1.is_woken());
1140        assert!(wake_flag2.is_woken());
1141        assert!(!wake_select.is_woken());
1142    }
1143
1144    #[test]
1145    fn select_does_not_wake_reads_or_writes_on_eintr() {
1146        // Prepare a system and a pipe
1147        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
1148        let (read_fd1, _write_fd1) = system.pipe().unwrap();
1149        let (_read_fd2, write_fd2) = system.pipe().unwrap();
1150        // Fill the pipe buffer to make the next write pending
1151        system
1152            .write(write_fd2, &[0; PIPE_SIZE])
1153            .now_or_never()
1154            .unwrap()
1155            .unwrap();
1156        system
1157            .set_disposition(SIGUSR2, Disposition::Catch)
1158            .now_or_never()
1159            .unwrap()
1160            .unwrap();
1161
1162        let mut read_buffer = [0; 4];
1163        let mut read = pin!(system.read(read_fd1, &mut read_buffer));
1164        let mut write = pin!(system.write(write_fd2, &[1]));
1165        let mut wait = pin!(system.wait_for_signals());
1166
1167        let wake_flag1 = Arc::new(WakeFlag::new());
1168        let wake_flag2 = Arc::new(WakeFlag::new());
1169        let wake_flag3 = Arc::new(WakeFlag::new());
1170        let waker1 = Waker::from(wake_flag1.clone());
1171        let waker2 = Waker::from(wake_flag2.clone());
1172        let waker3 = Waker::from(wake_flag3.clone());
1173        let mut context1 = Context::from_waker(&waker1);
1174        let mut context2 = Context::from_waker(&waker2);
1175        let mut context3 = Context::from_waker(&waker3);
1176        assert_eq!(read.as_mut().poll(&mut context1), Pending);
1177        assert_eq!(write.as_mut().poll(&mut context2), Pending);
1178        assert_eq!(wait.as_mut().poll(&mut context3), Pending);
1179        assert!(!wake_flag1.is_woken());
1180        assert!(!wake_flag2.is_woken());
1181        assert!(!wake_flag3.is_woken());
1182
1183        system.raise(SIGUSR2).now_or_never().unwrap().unwrap();
1184
1185        let mut select_fut = pin!(system.select());
1186        let mut context4 = Context::from_waker(Waker::noop());
1187        assert_eq!(select_fut.as_mut().poll(&mut context4), Ready(()));
1188        assert!(!wake_flag1.is_woken());
1189        assert!(!wake_flag2.is_woken());
1190    }
1191
1192    #[test]
1193    fn signal_wait_is_made_ready_by_peek_after_caught() {
1194        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
1195        system
1196            .set_disposition(SIGINT, Disposition::Catch)
1197            .now_or_never()
1198            .unwrap()
1199            .unwrap();
1200        system
1201            .set_disposition(SIGCHLD, Disposition::Catch)
1202            .now_or_never()
1203            .unwrap()
1204            .unwrap();
1205        system
1206            .set_disposition(SIGUSR2, Disposition::Catch)
1207            .now_or_never()
1208            .unwrap()
1209            .unwrap();
1210
1211        let mut wait = pin!(system.wait_for_signals());
1212
1213        let wake_flag = Arc::new(WakeFlag::new());
1214        let waker = Waker::from(wake_flag.clone());
1215        let mut context = Context::from_waker(&waker);
1216        assert_eq!(wait.as_mut().poll(&mut context), Pending);
1217
1218        system.peek();
1219        assert!(!wake_flag.is_woken());
1220
1221        // Send signals to make the wait ready
1222        system.raise(SIGINT).now_or_never().unwrap().unwrap();
1223        system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
1224        system.peek();
1225        assert!(wake_flag.is_woken());
1226
1227        let wake_flag = Arc::new(WakeFlag::new());
1228        let waker = Waker::from(wake_flag.clone());
1229        let mut context = Context::from_waker(&waker);
1230        assert_matches!(wait.poll(&mut context), Ready(signals) => {
1231            assert_matches!(***signals, [SIGINT, SIGCHLD] | [SIGCHLD, SIGINT]);
1232        });
1233    }
1234}