use super::CaughtSignals;
use super::Chdir;
use super::ChildProcessStarter;
use super::Clock;
use super::CpuTimes;
use super::Dir;
use super::Disposition;
use super::Dup;
use super::Errno;
use super::Exec;
use super::Exit;
use super::Fcntl;
use super::FdFlag;
use super::Fork;
use super::Fstat;
use super::GetCwd;
use super::GetPid;
use super::GetPw;
use super::GetRlimit;
use super::GetSigaction;
use super::GetUid;
use super::Gid;
use super::IsExecutableFile;
use super::Isatty;
use super::LimitPair;
use super::Mode;
use super::OfdAccess;
use super::Open;
use super::OpenFlag;
use super::Path;
use super::PathBuf;
use super::Pipe;
use super::Read;
use super::Resource;
use super::Result;
use super::Seek;
use super::Select;
use super::SelectSystem;
use super::SendSignal;
use super::SetPgid;
use super::SetRlimit;
use super::ShellPath;
use super::Sigaction;
use super::Sigmask;
use super::SigmaskOp;
use super::SignalStatus;
use super::SignalSystem;
use super::Signals;
use super::Sysconf;
use super::TcGetPgrp;
use super::TcSetPgrp;
use super::Times;
use super::Uid;
use super::Umask;
use super::UnixString;
use super::Wait;
use super::Write;
use super::signal;
#[cfg(doc)]
use crate::Env;
use crate::io::Fd;
use crate::job::Pid;
use crate::job::ProcessState;
use crate::semantics::ExitStatus;
use crate::system::Close;
use enumset::EnumSet;
use std::cell::RefCell;
use std::convert::Infallible;
use std::ffi::CStr;
use std::ffi::CString;
use std::ffi::c_int;
use std::future::poll_fn;
use std::io::SeekFrom;
use std::ops::RangeInclusive;
use std::rc::Rc;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
#[deprecated(since = "0.13.0", note = "use `Concurrent` instead")]
#[derive(Debug)]
pub struct SharedSystem<S>(pub(super) Rc<RefCell<SelectSystem<S>>>);
#[allow(deprecated)]
impl<S> SharedSystem<S> {
pub fn new(system: S) -> Self {
SharedSystem(Rc::new(RefCell::new(SelectSystem::new(system))))
}
pub async fn read_async(&self, fd: Fd, buffer: &mut [u8]) -> Result<usize>
where
S: Fcntl + Read,
{
let was_nonblocking = self.get_and_set_nonblocking(fd, true)?;
let waker = Rc::new(RefCell::new(None));
let result = loop {
match self.read(fd, buffer).await {
#[allow(unreachable_patterns)]
Err(Errno::EAGAIN | Errno::EWOULDBLOCK) => {
let mut first_time = true;
poll_fn(|context| {
if first_time {
first_time = false;
*waker.borrow_mut() = Some(context.waker().clone());
self.0.borrow_mut().add_reader(fd, Rc::downgrade(&waker));
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
}
Err(Errno::EINTR) => (),
result => break result,
}
};
self.get_and_set_nonblocking(fd, was_nonblocking).ok();
result
}
pub async fn write_all(&self, fd: Fd, mut buffer: &[u8]) -> Result<usize>
where
S: Fcntl + Write,
{
if buffer.is_empty() {
return Ok(0);
}
let was_nonblocking = self.get_and_set_nonblocking(fd, true)?;
let mut written = 0;
let waker = Rc::new(RefCell::new(None));
let result = loop {
match self.write(fd, buffer).await {
Ok(count) => {
written += count;
buffer = &buffer[count..];
if buffer.is_empty() {
break Ok(written);
}
}
Err(Errno::EINTR) => (),
#[allow(unreachable_patterns)]
Err(Errno::EAGAIN | Errno::EWOULDBLOCK) => {
let mut first_time = true;
poll_fn(|context| {
if first_time {
first_time = false;
*waker.borrow_mut() = Some(context.waker().clone());
self.0.borrow_mut().add_writer(fd, Rc::downgrade(&waker));
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
}
Err(error) => break Err(error),
}
};
self.get_and_set_nonblocking(fd, was_nonblocking).ok();
result
}
pub async fn print_error(&self, message: &str)
where
S: Fcntl + Write,
{
_ = self.write_all(Fd::STDERR, message.as_bytes()).await;
}
pub async fn wait_until(&self, target: Instant)
where
S: Clock,
{
let waker = Rc::new(RefCell::new(None));
poll_fn(|context| {
let mut system = self.0.borrow_mut();
let now = system.now();
if now >= target {
return Poll::Ready(());
}
*waker.borrow_mut() = Some(context.waker().clone());
system.add_timeout(target, Rc::downgrade(&waker));
Poll::Pending
})
.await
}
pub async fn wait_for_signals(&self) -> Rc<[signal::Number]> {
let status = self.0.borrow_mut().add_signal_waker();
poll_fn(|context| {
let mut status = status.borrow_mut();
let dummy_status = SignalStatus::Expected(None);
let old_status = std::mem::replace(&mut *status, dummy_status);
match old_status {
SignalStatus::Caught(signals) => Poll::Ready(signals),
SignalStatus::Expected(_) => {
*status = SignalStatus::Expected(Some(context.waker().clone()));
Poll::Pending
}
}
})
.await
}
pub async fn wait_for_signal(&self, signal: signal::Number) {
while !self.wait_for_signals().await.contains(&signal) {}
}
pub fn select(&self, poll: bool) -> Result<()>
where
S: Select + CaughtSignals + Clock,
{
use std::task::{Context, Poll, Waker};
let mut future = std::pin::pin!(SelectSystem::select(&self.0, poll));
let mut context = Context::from_waker(Waker::noop());
match future.as_mut().poll(&mut context) {
Poll::Ready(result) => result,
Poll::Pending => Ok(()),
}
}
pub async fn select_async(&self) -> Result<()>
where
S: Select + CaughtSignals + Clock,
{
SelectSystem::select(&self.0, false).await
}
pub fn new_child_process(&self) -> Result<ChildProcessStarter<S>>
where
S: Fork,
{
self.0.borrow().new_child_process()
}
}
#[allow(deprecated)]
impl<S> Clone for SharedSystem<S> {
fn clone(&self) -> Self {
SharedSystem(self.0.clone())
}
}
#[allow(deprecated)]
impl<T: Fstat> Fstat for SharedSystem<T> {
type Stat = T::Stat;
fn fstat(&self, fd: Fd) -> Result<Self::Stat> {
self.0.borrow().fstat(fd)
}
fn fstatat(&self, dir_fd: Fd, path: &CStr, follow_symlinks: bool) -> Result<Self::Stat> {
self.0.borrow().fstatat(dir_fd, path, follow_symlinks)
}
}
#[allow(deprecated)]
impl<T: IsExecutableFile> IsExecutableFile for SharedSystem<T> {
fn is_executable_file(&self, path: &CStr) -> bool {
self.0.borrow().is_executable_file(path)
}
}
#[allow(deprecated)]
impl<T: Pipe> Pipe for SharedSystem<T> {
fn pipe(&self) -> Result<(Fd, Fd)> {
self.0.borrow().pipe()
}
}
#[allow(deprecated)]
impl<T: Dup> Dup for SharedSystem<T> {
fn dup(&self, from: Fd, to_min: Fd, flags: EnumSet<FdFlag>) -> Result<Fd> {
self.0.borrow().dup(from, to_min, flags)
}
fn dup2(&self, from: Fd, to: Fd) -> Result<Fd> {
self.0.borrow().dup2(from, to)
}
}
#[allow(deprecated)]
impl<T: Open> Open for SharedSystem<T> {
fn open(
&self,
path: &CStr,
access: OfdAccess,
flags: EnumSet<OpenFlag>,
mode: Mode,
) -> impl Future<Output = Result<Fd>> + use<T> {
self.0.borrow().open(path, access, flags, mode)
}
fn open_tmpfile(&self, parent_dir: &Path) -> Result<Fd> {
self.0.borrow().open_tmpfile(parent_dir)
}
fn fdopendir(&self, fd: Fd) -> Result<impl Dir + use<T>> {
self.0.borrow().fdopendir(fd)
}
fn opendir(&self, path: &CStr) -> Result<impl Dir + use<T>> {
self.0.borrow().opendir(path)
}
}
#[allow(deprecated)]
impl<T: Close> Close for SharedSystem<T> {
fn close(&self, fd: Fd) -> Result<()> {
self.0.borrow().close(fd)
}
}
#[allow(deprecated)]
impl<T: Fcntl> Fcntl for SharedSystem<T> {
fn ofd_access(&self, fd: Fd) -> Result<OfdAccess> {
self.0.borrow().ofd_access(fd)
}
fn get_and_set_nonblocking(&self, fd: Fd, nonblocking: bool) -> Result<bool> {
self.0.borrow().get_and_set_nonblocking(fd, nonblocking)
}
fn fcntl_getfd(&self, fd: Fd) -> Result<EnumSet<FdFlag>> {
self.0.borrow().fcntl_getfd(fd)
}
fn fcntl_setfd(&self, fd: Fd, flags: EnumSet<FdFlag>) -> Result<()> {
self.0.borrow().fcntl_setfd(fd, flags)
}
}
#[allow(deprecated)]
impl<T: Read> Read for SharedSystem<T> {
fn read<'a>(
&self,
fd: Fd,
buffer: &'a mut [u8],
) -> impl Future<Output = Result<usize>> + use<'a, T> {
self.0.borrow().read(fd, buffer)
}
}
#[allow(deprecated)]
impl<T: Write> Write for SharedSystem<T> {
fn write<'a>(
&self,
fd: Fd,
buffer: &'a [u8],
) -> impl Future<Output = Result<usize>> + use<'a, T> {
self.0.borrow().write(fd, buffer)
}
}
#[allow(deprecated)]
impl<T: Seek> Seek for SharedSystem<T> {
fn lseek(&self, fd: Fd, position: SeekFrom) -> Result<u64> {
self.0.borrow().lseek(fd, position)
}
}
#[allow(deprecated)]
impl<T: Umask> Umask for SharedSystem<T> {
fn umask(&self, new_mask: Mode) -> Mode {
self.0.borrow().umask(new_mask)
}
}
#[allow(deprecated)]
impl<T: GetCwd> GetCwd for SharedSystem<T> {
fn getcwd(&self) -> Result<PathBuf> {
self.0.borrow().getcwd()
}
}
#[allow(deprecated)]
impl<T: Chdir> Chdir for SharedSystem<T> {
fn chdir(&self, path: &CStr) -> Result<()> {
self.0.borrow().chdir(path)
}
}
#[allow(deprecated)]
impl<T: Clock> Clock for SharedSystem<T> {
fn now(&self) -> Instant {
self.0.borrow().now()
}
}
#[allow(deprecated)]
impl<T: Times> Times for SharedSystem<T> {
fn times(&self) -> Result<CpuTimes> {
self.0.borrow().times()
}
}
#[allow(deprecated)]
impl<T: GetPid> GetPid for SharedSystem<T> {
fn getsid(&self, pid: Pid) -> Result<Pid> {
self.0.borrow().getsid(pid)
}
fn getpid(&self) -> Pid {
self.0.borrow().getpid()
}
fn getppid(&self) -> Pid {
self.0.borrow().getppid()
}
fn getpgrp(&self) -> Pid {
self.0.borrow().getpgrp()
}
}
#[allow(deprecated)]
impl<T: SetPgid> SetPgid for SharedSystem<T> {
fn setpgid(&self, pid: Pid, pgid: Pid) -> Result<()> {
self.0.borrow().setpgid(pid, pgid)
}
}
#[allow(deprecated)]
impl<T: Signals> Signals for SharedSystem<T> {
const SIGABRT: signal::Number = T::SIGABRT;
const SIGALRM: signal::Number = T::SIGALRM;
const SIGBUS: signal::Number = T::SIGBUS;
const SIGCHLD: signal::Number = T::SIGCHLD;
const SIGCLD: Option<signal::Number> = T::SIGCLD;
const SIGCONT: signal::Number = T::SIGCONT;
const SIGEMT: Option<signal::Number> = T::SIGEMT;
const SIGFPE: signal::Number = T::SIGFPE;
const SIGHUP: signal::Number = T::SIGHUP;
const SIGILL: signal::Number = T::SIGILL;
const SIGINFO: Option<signal::Number> = T::SIGINFO;
const SIGINT: signal::Number = T::SIGINT;
const SIGIO: Option<signal::Number> = T::SIGIO;
const SIGIOT: signal::Number = T::SIGIOT;
const SIGKILL: signal::Number = T::SIGKILL;
const SIGLOST: Option<signal::Number> = T::SIGLOST;
const SIGPIPE: signal::Number = T::SIGPIPE;
const SIGPOLL: Option<signal::Number> = T::SIGPOLL;
const SIGPROF: signal::Number = T::SIGPROF;
const SIGPWR: Option<signal::Number> = T::SIGPWR;
const SIGQUIT: signal::Number = T::SIGQUIT;
const SIGSEGV: signal::Number = T::SIGSEGV;
const SIGSTKFLT: Option<signal::Number> = T::SIGSTKFLT;
const SIGSTOP: signal::Number = T::SIGSTOP;
const SIGSYS: signal::Number = T::SIGSYS;
const SIGTERM: signal::Number = T::SIGTERM;
const SIGTHR: Option<signal::Number> = T::SIGTHR;
const SIGTRAP: signal::Number = T::SIGTRAP;
const SIGTSTP: signal::Number = T::SIGTSTP;
const SIGTTIN: signal::Number = T::SIGTTIN;
const SIGTTOU: signal::Number = T::SIGTTOU;
const SIGURG: signal::Number = T::SIGURG;
const SIGUSR1: signal::Number = T::SIGUSR1;
const SIGUSR2: signal::Number = T::SIGUSR2;
const SIGVTALRM: signal::Number = T::SIGVTALRM;
const SIGWINCH: signal::Number = T::SIGWINCH;
const SIGXCPU: signal::Number = T::SIGXCPU;
const SIGXFSZ: signal::Number = T::SIGXFSZ;
fn sigrt_range(&self) -> Option<RangeInclusive<signal::Number>> {
self.0.borrow().sigrt_range()
}
fn iter_sigrt(&self) -> impl DoubleEndedIterator<Item = signal::Number> + use<T> {
self.0.borrow().iter_sigrt()
}
fn validate_signal(&self, number: signal::RawNumber) -> Option<(signal::Name, signal::Number)> {
self.0.borrow().validate_signal(number)
}
}
#[allow(deprecated)]
impl<T: Sigmask> Sigmask for SharedSystem<T> {
fn sigmask(
&self,
op: Option<(SigmaskOp, &[signal::Number])>,
old_mask: Option<&mut Vec<signal::Number>>,
) -> impl Future<Output = Result<()>> + use<T> {
(**self.0.borrow()).sigmask(op, old_mask)
}
}
#[allow(deprecated)]
impl<T: Sigaction> GetSigaction for SharedSystem<T> {
fn get_sigaction(&self, signal: signal::Number) -> Result<Disposition> {
self.0.borrow().get_sigaction(signal)
}
}
#[allow(deprecated)]
impl<T: Sigaction> Sigaction for SharedSystem<T> {
fn sigaction(&self, signal: signal::Number, action: Disposition) -> Result<Disposition> {
self.0.borrow().sigaction(signal, action)
}
}
#[allow(deprecated)]
impl<T: CaughtSignals> CaughtSignals for SharedSystem<T> {
fn caught_signals(&self) -> Vec<signal::Number> {
self.0.borrow().caught_signals()
}
}
#[allow(deprecated)]
impl<T: SendSignal> SendSignal for SharedSystem<T> {
fn kill(
&self,
target: Pid,
signal: Option<signal::Number>,
) -> impl Future<Output = Result<()>> + use<T> {
self.0.borrow().kill(target, signal)
}
fn raise(&self, signal: signal::Number) -> impl Future<Output = Result<()>> + use<T> {
self.0.borrow().raise(signal)
}
}
#[allow(deprecated)]
impl<T: Select> Select for SharedSystem<T> {
fn select<'a>(
&self,
readers: &'a mut Vec<Fd>,
writers: &'a mut Vec<Fd>,
timeout: Option<Duration>,
signal_mask: Option<&[signal::Number]>,
) -> impl Future<Output = Result<c_int>> + use<'a, T> {
(**self.0.borrow()).select(readers, writers, timeout, signal_mask)
}
}
#[allow(deprecated)]
impl<T: Isatty> Isatty for SharedSystem<T> {
fn isatty(&self, fd: Fd) -> bool {
self.0.borrow().isatty(fd)
}
}
#[allow(deprecated)]
impl<T: TcGetPgrp> TcGetPgrp for SharedSystem<T> {
fn tcgetpgrp(&self, fd: Fd) -> Result<Pid> {
self.0.borrow().tcgetpgrp(fd)
}
}
#[allow(deprecated)]
impl<T: TcSetPgrp> TcSetPgrp for SharedSystem<T> {
fn tcsetpgrp(&self, fd: Fd, pgid: Pid) -> impl Future<Output = Result<()>> + use<T> {
self.0.borrow().tcsetpgrp(fd, pgid)
}
}
#[allow(deprecated)]
impl<T: Wait> Wait for SharedSystem<T> {
fn wait(&self, target: Pid) -> Result<Option<(Pid, ProcessState)>> {
self.0.borrow().wait(target)
}
}
#[allow(deprecated)]
impl<T: Exec> Exec for SharedSystem<T> {
fn execve(
&self,
path: &CStr,
args: &[CString],
envs: &[CString],
) -> impl Future<Output = Result<Infallible>> + use<T> {
self.0.borrow().execve(path, args, envs)
}
}
#[allow(deprecated)]
impl<T: Exit> Exit for SharedSystem<T> {
fn exit(&self, exit_status: ExitStatus) -> impl Future<Output = Infallible> + use<T> {
self.0.borrow().exit(exit_status)
}
}
#[allow(deprecated)]
impl<T: GetUid> GetUid for SharedSystem<T> {
fn getuid(&self) -> Uid {
self.0.borrow().getuid()
}
fn geteuid(&self) -> Uid {
self.0.borrow().geteuid()
}
fn getgid(&self) -> Gid {
self.0.borrow().getgid()
}
fn getegid(&self) -> Gid {
self.0.borrow().getegid()
}
}
#[allow(deprecated)]
impl<T: GetPw> GetPw for SharedSystem<T> {
fn getpwnam_dir(&self, name: &CStr) -> Result<Option<PathBuf>> {
self.0.borrow().getpwnam_dir(name)
}
}
#[allow(deprecated)]
impl<T: Sysconf> Sysconf for SharedSystem<T> {
fn confstr_path(&self) -> Result<UnixString> {
self.0.borrow().confstr_path()
}
}
#[allow(deprecated)]
impl<T: ShellPath> ShellPath for SharedSystem<T> {
fn shell_path(&self) -> CString {
self.0.borrow().shell_path()
}
}
#[allow(deprecated)]
impl<T: GetRlimit> GetRlimit for SharedSystem<T> {
fn getrlimit(&self, resource: Resource) -> Result<LimitPair> {
self.0.borrow().getrlimit(resource)
}
}
#[allow(deprecated)]
impl<T: SetRlimit> SetRlimit for SharedSystem<T> {
fn setrlimit(&self, resource: Resource, limits: LimitPair) -> Result<()> {
self.0.borrow().setrlimit(resource, limits)
}
}
#[allow(deprecated)]
impl<S: Signals + Sigmask + Sigaction> SignalSystem for SharedSystem<S> {
#[inline]
fn get_disposition(&self, signal: signal::Number) -> Result<Disposition> {
self.0.borrow().get_disposition(signal)
}
#[inline]
fn set_disposition(
&self,
signal: signal::Number,
disposition: Disposition,
) -> impl Future<Output = Result<Disposition>> + use<S> {
let this = Rc::clone(&self.0);
async move { SelectSystem::set_disposition(&this, signal, disposition).await }
}
}
#[cfg(test)]
#[allow(deprecated)]
mod tests {
use super::super::r#virtual::PIPE_SIZE;
use super::super::r#virtual::VirtualSystem;
use super::super::r#virtual::{SIGCHLD, SIGINT, SIGTERM, SIGUSR1};
use super::*;
use crate::test_helper::WakeFlag;
use assert_matches::assert_matches;
use futures_util::FutureExt as _;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use std::time::Duration;
#[test]
fn shared_system_read_async_ready() {
let system = SharedSystem::new(VirtualSystem::new());
let (reader, writer) = system.pipe().unwrap();
system.write(writer, &[42]).now_or_never().unwrap().unwrap();
let mut buffer = [0; 2];
let result = system.read_async(reader, &mut buffer).now_or_never();
assert_eq!(result, Some(Ok(1)));
assert_eq!(buffer[..1], [42]);
}
#[test]
fn shared_system_read_async_not_ready_at_first() {
let system = VirtualSystem::new();
let process_id = system.process_id;
let state = Rc::clone(&system.state);
let system = SharedSystem::new(system);
let system2 = system.clone();
let (reader, writer) = system.pipe().unwrap();
let mut context = Context::from_waker(Waker::noop());
let mut buffer = [0; 2];
let mut future = Box::pin(system.read_async(reader, &mut buffer));
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
let result = system2.select(false);
assert_eq!(result, Ok(()));
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
state.borrow_mut().processes[&process_id].fds[&writer]
.open_file_description
.borrow_mut()
.write(&[56])
.unwrap();
let result = future.as_mut().poll(&mut context);
drop(future);
assert_eq!(result, Poll::Ready(Ok(1)));
assert_eq!(buffer[..1], [56]);
}
#[test]
fn shared_system_write_all_ready() {
let system = SharedSystem::new(VirtualSystem::new());
let (reader, writer) = system.pipe().unwrap();
let result = system.write_all(writer, &[17]).now_or_never().unwrap();
assert_eq!(result, Ok(1));
let mut buffer = [0; 2];
system
.read(reader, &mut buffer)
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(buffer[..1], [17]);
}
#[test]
fn shared_system_write_all_not_ready_at_first() {
let system = VirtualSystem::new();
let process_id = system.process_id;
let state = Rc::clone(&system.state);
let system = SharedSystem::new(system);
let (reader, writer) = system.pipe().unwrap();
state.borrow_mut().processes[&process_id].fds[&writer]
.open_file_description
.borrow_mut()
.write(&[42; PIPE_SIZE])
.unwrap();
let mut context = Context::from_waker(Waker::noop());
let mut out_buffer = [87; PIPE_SIZE];
out_buffer[0] = 0;
out_buffer[1] = 1;
out_buffer[PIPE_SIZE - 2] = 0xFE;
out_buffer[PIPE_SIZE - 1] = 0xFF;
let mut future = Box::pin(system.write_all(writer, &out_buffer));
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
let mut in_buffer = [0; PIPE_SIZE - 1];
state.borrow_mut().processes[&process_id].fds[&reader]
.open_file_description
.borrow_mut()
.read(&mut in_buffer)
.unwrap();
assert_eq!(in_buffer, [42; PIPE_SIZE - 1]);
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
in_buffer[0] = 0;
state.borrow_mut().processes[&process_id].fds[&reader]
.open_file_description
.borrow_mut()
.read(&mut in_buffer[..1])
.unwrap();
assert_eq!(in_buffer[..1], [42; 1]);
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Ready(Ok(out_buffer.len())));
state.borrow_mut().processes[&process_id].fds[&reader]
.open_file_description
.borrow_mut()
.read(&mut in_buffer)
.unwrap();
assert_eq!(in_buffer, out_buffer[..PIPE_SIZE - 1]);
state.borrow_mut().processes[&process_id].fds[&reader]
.open_file_description
.borrow_mut()
.read(&mut in_buffer)
.unwrap();
assert_eq!(in_buffer[..1], out_buffer[PIPE_SIZE - 1..]);
}
#[test]
fn shared_system_write_all_empty() {
let system = VirtualSystem::new();
let process_id = system.process_id;
let state = Rc::clone(&system.state);
let system = SharedSystem::new(system);
let (_reader, writer) = system.pipe().unwrap();
state.borrow_mut().processes[&process_id].fds[&writer]
.open_file_description
.borrow_mut()
.write(&[0; PIPE_SIZE])
.unwrap();
let mut context = Context::from_waker(Waker::noop());
let mut future = Box::pin(system.write_all(writer, &[]));
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Ready(Ok(0)));
}
#[test]
fn shared_system_wait_until() {
let system = VirtualSystem::new();
let state = Rc::clone(&system.state);
let system = SharedSystem::new(system);
let start = Instant::now();
state.borrow_mut().now = Some(start);
let target = start + Duration::from_millis(1_125);
let mut future = Box::pin(system.wait_until(target));
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
let poll = future.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert!(!wake_flag.is_woken());
state.borrow_mut().advance_time(target);
system.select(false).unwrap();
assert!(wake_flag.is_woken());
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
let poll = future.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Ready(()));
}
#[test]
fn shared_system_wait_for_signals() {
let system = VirtualSystem::new();
let process_id = system.process_id;
let state = Rc::clone(&system.state);
let system = SharedSystem::new(system);
system
.set_disposition(SIGCHLD, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGINT, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGUSR1, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
let mut context = Context::from_waker(Waker::noop());
let mut future = Box::pin(system.wait_for_signals());
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
{
let mut state = state.borrow_mut();
let process = state.processes.get_mut(&process_id).unwrap();
assert!(process.blocked_signals().contains(&SIGCHLD));
assert!(process.blocked_signals().contains(&SIGINT));
assert!(process.blocked_signals().contains(&SIGUSR1));
let _ = process.raise_signal(SIGCHLD);
let _ = process.raise_signal(SIGINT);
}
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
system.select(false).unwrap();
let result = future.as_mut().poll(&mut context);
assert_matches!(result, Poll::Ready(signals) => {
assert_eq!(signals.len(), 2);
assert!(signals.contains(&SIGCHLD));
assert!(signals.contains(&SIGINT));
});
}
#[test]
fn shared_system_wait_for_signal_returns_on_caught() {
let system = VirtualSystem::new();
let process_id = system.process_id;
let state = Rc::clone(&system.state);
let system = SharedSystem::new(system);
system
.set_disposition(SIGCHLD, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
let mut context = Context::from_waker(Waker::noop());
let mut future = Box::pin(system.wait_for_signal(SIGCHLD));
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
{
let mut state = state.borrow_mut();
let process = state.processes.get_mut(&process_id).unwrap();
assert!(process.blocked_signals().contains(&SIGCHLD));
let _ = process.raise_signal(SIGCHLD);
}
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
system.select(false).unwrap();
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Ready(()));
}
#[test]
fn shared_system_wait_for_signal_ignores_irrelevant_signals() {
let system = VirtualSystem::new();
let process_id = system.process_id;
let state = Rc::clone(&system.state);
let system = SharedSystem::new(system);
system
.set_disposition(SIGINT, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGTERM, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
let mut context = Context::from_waker(Waker::noop());
let mut future = Box::pin(system.wait_for_signal(SIGINT));
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
{
let mut state = state.borrow_mut();
let process = state.processes.get_mut(&process_id).unwrap();
let _ = process.raise_signal(SIGCHLD);
let _ = process.raise_signal(SIGTERM);
}
system.select(false).unwrap();
let result = future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
}
#[test]
fn shared_system_select_consumes_all_pending_signals() {
let system = VirtualSystem::new();
let process_id = system.process_id;
let state = Rc::clone(&system.state);
let system = SharedSystem::new(system);
system
.set_disposition(SIGINT, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGTERM, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
{
let mut state = state.borrow_mut();
let process = state.processes.get_mut(&process_id).unwrap();
let _ = process.raise_signal(SIGINT);
let _ = process.raise_signal(SIGTERM);
}
system.select(false).unwrap();
let state = state.borrow();
let process = state.processes.get(&process_id).unwrap();
let blocked = process.blocked_signals();
assert!(blocked.contains(&SIGINT));
assert!(blocked.contains(&SIGTERM));
let pending = process.pending_signals();
assert!(!pending.contains(&SIGINT));
assert!(!pending.contains(&SIGTERM));
}
#[test]
fn shared_system_select_does_not_wake_signal_waiters_on_io() {
let system = VirtualSystem::new();
let system_1 = SharedSystem::new(system);
let system_2 = system_1.clone();
let system_3 = system_1.clone();
let (reader, writer) = system_1.pipe().unwrap();
system_2
.set_disposition(SIGCHLD, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
let mut buffer = [0];
let mut read_future = Box::pin(system_1.read_async(reader, &mut buffer));
let mut signal_future = Box::pin(system_2.wait_for_signals());
let mut context = Context::from_waker(Waker::noop());
let result = read_future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
let result = signal_future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
system_3
.write(writer, &[42])
.now_or_never()
.unwrap()
.unwrap();
system_3.select(false).unwrap();
let result = read_future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Ready(Ok(1)));
let result = signal_future.as_mut().poll(&mut context);
assert_eq!(result, Poll::Pending);
}
#[test]
fn shared_system_select_poll() {
let system = VirtualSystem::new();
let state = Rc::clone(&system.state);
let system = SharedSystem::new(system);
let start = Instant::now();
state.borrow_mut().now = Some(start);
let target = start + Duration::from_millis(1_125);
let mut future = Box::pin(system.wait_until(target));
let mut context = Context::from_waker(Waker::noop());
let poll = future.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
system.select(true).unwrap();
let poll = future.as_mut().poll(&mut context);
assert_eq!(poll, Poll::Pending);
assert_eq!(state.borrow().now, Some(start));
}
}