use super::{
Duration, Errno, Fd, Result, Select, SigmaskOp, TryInto, VirtualSystem, raise_sigchld, signal,
};
use crate::job::ProcessState;
use std::cell::{Cell, LazyCell};
use std::ffi::c_int;
use std::future::poll_fn;
use std::rc::Rc;
use std::task::{Poll, Waker};
impl Select for VirtualSystem {
fn select<'a>(
&self,
readers: &'a mut Vec<Fd>,
writers: &'a mut Vec<Fd>,
timeout: Option<Duration>,
signal_mask: Option<&[signal::Number]>,
) -> impl Future<Output = Result<c_int>> + use<'a> {
let this = self.clone();
let signal_mask = signal_mask.map(|mask| mask.to_vec());
#[allow(clippy::await_holding_refcell_ref)] async move {
let (old_mask, old_caught_signals, deadline) = {
let state = &mut *this.state.borrow_mut();
let proc = state
.processes
.get_mut(&this.process_id)
.expect("the current process should be in the system state");
let old_caught_signals = proc.caught_signals.len();
let old_mask = match signal_mask {
None => None,
Some(new_mask) => {
let old_mask = proc
.blocked_signals()
.iter()
.copied()
.collect::<Vec<signal::Number>>();
let result = proc.block_signals(SigmaskOp::Set, &new_mask);
if result.process_state_changed {
let ppid = proc.ppid;
raise_sigchld(state, ppid);
}
Some(old_mask)
}
};
let deadline = match timeout {
None | Some(Duration::ZERO) => None,
Some(timeout) => {
let now = state.now;
let now = now.expect("the current time should be set in the system state");
Some(now + timeout)
}
};
(old_mask, old_caught_signals, deadline)
};
let waker: LazyCell<Rc<Cell<Option<Waker>>>> = LazyCell::default();
let result = poll_fn(|context| {
let state = &mut *this.state.borrow_mut();
let proc = state
.processes
.get_mut(&this.process_id)
.expect("the current process should be in the system state");
if let ProcessState::Halted(reason) = proc.state() {
if reason.is_stopped() {
waker.set(Some(context.waker().clone()));
proc.wake_on_resumption(Rc::downgrade(&waker));
return Poll::Pending;
}
}
if proc.caught_signals.len() != old_caught_signals {
return Poll::Ready(Err(Errno::EINTR));
}
let mut ready_readers = Vec::new();
let mut ready_writers = Vec::new();
for fd in readers.iter().cloned() {
let Some(fd_body) = proc.fds().get(&fd) else {
return Poll::Ready(Err(Errno::EBADF));
};
let ofd = fd_body.open_file_description.borrow();
if ofd.is_ready_for_reading() {
ready_readers.push(fd);
}
}
for fd in writers.iter().cloned() {
let Some(fd_body) = proc.fds().get(&fd) else {
return Poll::Ready(Err(Errno::EBADF));
};
let ofd = fd_body.open_file_description.borrow();
if ofd.is_ready_for_writing() {
ready_writers.push(fd);
}
}
let count = (ready_readers.len() + ready_writers.len())
.try_into()
.unwrap();
if count > 0 {
*readers = ready_readers;
*writers = ready_writers;
return Poll::Ready(Ok(count));
}
let expired = match deadline {
None => timeout == Some(Duration::ZERO),
Some(deadline) => {
let now = state.now;
let now = now.expect("the current time should be set in the system state");
now >= deadline
}
};
if expired {
readers.clear();
writers.clear();
return Poll::Ready(Ok(0));
}
waker.set(Some(context.waker().clone()));
proc.register_signal_waker(Rc::downgrade(&waker));
for fd in readers.iter() {
let mut ofd = proc.fds()[fd].open_file_description.borrow_mut();
ofd.register_reader_waker(Rc::downgrade(&waker));
}
for fd in writers.iter() {
let mut ofd = proc.fds()[fd].open_file_description.borrow_mut();
ofd.register_writer_waker(Rc::downgrade(&waker));
}
if let Some(deadline) = deadline {
state.scheduled_wakers.push(deadline, Rc::downgrade(&waker));
}
Poll::Pending
})
.await;
drop(waker);
if let Some(old_mask) = old_mask {
let mut state = this.state.borrow_mut();
let proc = state
.processes
.get_mut(&this.process_id)
.expect("the current process should be in the system state");
let result = proc.block_signals(SigmaskOp::Set, &old_mask);
if result.process_state_changed {
let ppid = proc.ppid;
raise_sigchld(&mut state, ppid);
drop(state);
this.block_until_running().await;
}
}
result
}
}
}
#[cfg(test)]
mod tests {
use super::super::Process;
use super::super::{PIPE_BUF, PIPE_SIZE, SIGCHLD, SIGCONT, SIGTSTP};
use super::*;
use crate::job::Pid;
use crate::system::{
CaughtSignals as _, Close as _, Disposition, Pipe as _, Read as _, SendSignal as _,
Sigaction as _, Sigmask as _, Write as _,
};
use crate::test_helper::WakeFlag;
use futures_util::FutureExt as _;
use std::pin::pin;
use std::sync::Arc;
use std::task::{Context, Waker};
use std::time::Instant;
#[test]
fn select_with_no_condition_blocks_forever() {
let system = VirtualSystem::new();
let mut readers = vec![];
let mut writers = vec![];
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
}
#[test]
fn select_with_zero_timeout_returns_immediately() {
let system = VirtualSystem::new();
let mut readers = vec![];
let mut writers = vec![];
let mut select =
pin!(system.select(&mut readers, &mut writers, Some(Duration::ZERO), None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(0)));
assert!(!woken.is_woken());
}
#[test]
fn select_regular_file_is_always_ready() {
let system = VirtualSystem::new();
let mut readers = vec![Fd::STDIN];
let mut writers = vec![Fd::STDOUT, Fd::STDERR];
{
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(3)));
assert!(!woken.is_woken());
}
assert_eq!(readers, [Fd::STDIN]);
assert_eq!(writers, [Fd::STDOUT, Fd::STDERR]);
}
#[test]
fn select_pipe_reader_is_ready_if_writer_is_closed() {
let system = VirtualSystem::new();
let (reader, writer) = system.pipe().unwrap();
system.close(writer).unwrap();
let mut readers = vec![reader];
let mut writers = vec![];
{
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(1)));
assert!(!woken.is_woken());
}
assert_eq!(readers, [reader]);
assert_eq!(writers, []);
}
#[test]
fn select_pipe_reader_is_ready_if_something_has_been_written() {
let system = VirtualSystem::new();
let (reader, writer) = system.pipe().unwrap();
system.write(writer, &[0]).now_or_never().unwrap().unwrap();
let mut readers = vec![reader];
let mut writers = vec![];
{
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(1)));
assert!(!woken.is_woken());
}
assert_eq!(readers, [reader]);
assert_eq!(writers, []);
}
#[test]
fn select_pipe_reader_gets_ready_when_some_data_is_written() {
let system = VirtualSystem::new();
let (reader, writer) = system.pipe().unwrap();
let mut readers = vec![reader];
let mut writers = vec![];
{
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
system.write(writer, &[0]).now_or_never().unwrap().unwrap();
assert!(woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(1)));
assert!(!woken.is_woken());
}
assert_eq!(readers, [reader]);
assert_eq!(writers, []);
}
#[test]
fn select_pipe_writer_is_ready_if_pipe_is_not_full() {
let system = VirtualSystem::new();
let (_reader, writer) = system.pipe().unwrap();
let mut readers = vec![];
let mut writers = vec![writer];
{
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(1)));
assert!(!woken.is_woken());
}
assert_eq!(readers, []);
assert_eq!(writers, [writer]);
}
#[test]
fn select_pipe_writer_gets_ready_when_some_data_is_read() {
let system = VirtualSystem::new();
let (reader, writer) = system.pipe().unwrap();
let mut readers = vec![];
let mut writers = vec![writer];
system
.write(writer, &[0; PIPE_SIZE])
.now_or_never()
.unwrap()
.unwrap();
{
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
system
.read(reader, &mut [0; PIPE_BUF])
.now_or_never()
.unwrap()
.unwrap();
assert!(woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(1)));
assert!(!woken.is_woken());
}
assert_eq!(readers, []);
assert_eq!(writers, [writer]);
}
#[test]
fn select_on_unreadable_fd() {
let system = VirtualSystem::new();
let (_reader, writer) = system.pipe().unwrap();
let mut fds = vec![writer];
let result = system
.select(&mut fds, &mut vec![], None, None)
.now_or_never()
.unwrap();
assert_eq!(result, Ok(1));
assert_eq!(fds, [writer]);
}
#[test]
fn select_on_unwritable_fd() {
let system = VirtualSystem::new();
let (reader, _writer) = system.pipe().unwrap();
let mut fds = vec![reader];
let result = system
.select(&mut vec![], &mut fds, None, None)
.now_or_never()
.unwrap();
assert_eq!(result, Ok(1));
assert_eq!(fds, [reader]);
}
#[test]
fn select_on_invalid_fd_for_readers() {
let system = VirtualSystem::new();
let mut readers = vec![Fd(17)];
let mut writers = vec![];
{
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Err(Errno::EBADF)));
assert!(!woken.is_woken());
}
assert_eq!(readers, [Fd(17)]);
assert_eq!(writers, []);
}
#[test]
fn select_on_invalid_fd_for_writers() {
let system = VirtualSystem::new();
let mut readers = vec![];
let mut writers = vec![Fd(17)];
{
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Err(Errno::EBADF)));
assert!(!woken.is_woken());
}
assert_eq!(readers, []);
assert_eq!(writers, [Fd(17)]);
}
fn system_for_catching_sigchld() -> VirtualSystem {
let system = VirtualSystem::new();
system
.sigmask(Some((SigmaskOp::Add, &[SIGCHLD])), None)
.now_or_never()
.unwrap()
.unwrap();
system.sigaction(SIGCHLD, Disposition::Catch).unwrap();
system
}
#[test]
fn select_on_pending_signal() {
let system = system_for_catching_sigchld();
let _ = system.current_process_mut().raise_signal(SIGCHLD);
let mut readers = vec![];
let mut writers = vec![];
let mut select = pin!(system.select(&mut readers, &mut writers, None, Some(&[])));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Err(Errno::EINTR)));
assert!(!woken.is_woken());
assert_eq!(system.caught_signals(), [SIGCHLD]);
let mut mask = Vec::new();
system
.sigmask(None, Some(&mut mask))
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(mask, [SIGCHLD]);
}
#[test]
fn select_interrupted_by_signal() {
let system = VirtualSystem::new();
system.sigaction(SIGCHLD, Disposition::Catch).unwrap();
let mut readers = vec![];
let mut writers = vec![];
let mut select = pin!(system.select(&mut readers, &mut writers, None, None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
_ = system.current_process_mut().raise_signal(SIGCHLD);
assert!(woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Err(Errno::EINTR)));
assert!(!woken.is_woken());
}
#[test]
fn select_on_signal_delivered_while_waiting() {
let system = system_for_catching_sigchld();
let mut readers = vec![];
let mut writers = vec![];
let mut select = pin!(system.select(&mut readers, &mut writers, None, Some(&[])));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
let mut mask = Vec::new();
system
.sigmask(None, Some(&mut mask))
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(mask, []);
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
let _ = system.current_process_mut().raise_signal(SIGCHLD);
assert!(woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Err(Errno::EINTR)));
assert!(!woken.is_woken());
assert_eq!(system.caught_signals(), [SIGCHLD]);
let mut mask = Vec::new();
system
.sigmask(None, Some(&mut mask))
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(mask, [SIGCHLD]);
}
#[test]
fn select_timeout() {
let system = VirtualSystem::new();
let now = Instant::now();
system.state.borrow_mut().now = Some(now);
let (reader_1, _writer_1) = system.pipe().unwrap();
let (_reader_2, writer_2) = system.pipe().unwrap();
system
.write(writer_2, &[0; PIPE_SIZE])
.now_or_never()
.unwrap()
.unwrap();
let mut readers = vec![reader_1];
let mut writers = vec![writer_2];
let timeout = Duration::new(42, 195);
{
let mut select = pin!(system.select(&mut readers, &mut writers, Some(timeout), None));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
let time_before_timeout = now + Duration::new(42, 0);
system.state.borrow_mut().advance_time(time_before_timeout);
assert!(!woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
system.state.borrow_mut().advance_time(now + timeout);
assert!(woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(0)));
}
assert_eq!(readers, []);
assert_eq!(writers, []);
}
fn virtual_system_with_parent_process() -> VirtualSystem {
let system = VirtualSystem::new();
let ppid = system.current_process().ppid;
let mut parent = Process::with_parent_and_group(Pid(1), Pid(1));
parent.set_disposition(SIGCHLD, Disposition::Catch);
system.state.borrow_mut().processes.insert(ppid, parent);
system
}
#[test]
fn select_returns_pending_while_process_is_suspended_1() {
let system = virtual_system_with_parent_process();
let now = Instant::now();
system.state.borrow_mut().now = Some(now);
system
.sigmask(Some((SigmaskOp::Add, &[SIGTSTP])), None)
.now_or_never()
.unwrap()
.unwrap();
system.raise(SIGTSTP).now_or_never().unwrap().unwrap();
let mut readers = vec![];
let mut writers = vec![];
let mut select = pin!(system.select(
&mut readers,
&mut writers,
Some(Duration::from_secs(1)),
Some(&[])
));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
{
let state = system.state.borrow();
let ppid = state.processes[&system.process_id].ppid;
assert_eq!(state.processes[&ppid].caught_signals, [SIGCHLD]);
}
system
.state
.borrow_mut()
.advance_time(now + Duration::from_secs(2));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
system.raise(SIGCONT).now_or_never().unwrap().unwrap();
assert!(woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(0)));
}
#[test]
fn select_returns_pending_while_process_is_suspended_2() {
let system = virtual_system_with_parent_process();
let now = Instant::now();
system.state.borrow_mut().now = Some(now);
let mut readers = vec![];
let mut writers = vec![];
let mut select = pin!(system.select(
&mut readers,
&mut writers,
Some(Duration::from_secs(1)),
Some(&[SIGTSTP])
));
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
let mut mask = Vec::new();
system
.sigmask(None, Some(&mut mask))
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(mask, [SIGTSTP]);
system.raise(SIGTSTP).now_or_never().unwrap().unwrap();
assert!(!woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
system
.state
.borrow_mut()
.advance_time(now + Duration::from_secs(1));
assert!(woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!woken.is_woken());
{
let state = system.state.borrow();
let ppid = state.processes[&system.process_id].ppid;
assert_eq!(state.processes[&ppid].caught_signals, [SIGCHLD]);
}
system.raise(SIGCONT).now_or_never().unwrap().unwrap();
assert!(woken.is_woken());
let woken = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&woken));
let mut context = Context::from_waker(&waker);
let poll = select.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(Ok(0)));
}
}