use super::{CaughtSignals, Clock, Errno, Fcntl, Read, Result, Select, Write};
use crate::io::Fd;
use crate::waker::{ScheduledWakerQueue, WakerSet};
use std::cell::{Cell, LazyCell, OnceCell, RefCell};
use std::collections::HashMap;
use std::future::poll_fn;
use std::ops::{Deref, DerefMut};
use std::pin::pin;
use std::rc::{Rc, Weak};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Waker};
use std::time::{Duration, Instant};
#[derive(Clone, Debug, Default)]
pub struct Concurrent<S> {
inner: S,
state: RefCell<State>,
}
#[derive(Clone, Debug, Default)]
struct State {
reads: HashMap<Fd, WakerSet>,
writes: HashMap<Fd, WakerSet>,
timeouts: ScheduledWakerQueue,
catches: WakerSet,
signals: Weak<SignalList>,
select_mask: Option<Vec<crate::signal::Number>>,
}
impl<S> Concurrent<S> {
#[must_use]
pub fn new(inner: S) -> Self {
let state = Default::default();
Self { inner, state }
}
}
impl<S> Read for Rc<Concurrent<S>>
where
S: Fcntl + Read,
{
fn read<'a>(
&self,
fd: Fd,
buffer: &'a mut [u8],
) -> impl Future<Output = Result<usize>> + use<'a, S> {
let this = Rc::clone(self);
async move {
let this = TemporaryNonBlockingGuard::new(&this, fd);
let waker = LazyCell::default();
loop {
match this.inner.read(fd, buffer).await {
#[allow(unreachable_patterns)]
Err(Errno::EAGAIN | Errno::EWOULDBLOCK | Errno::EINTR) => {
this.yield_for_read(fd, &waker).await
}
result => return result,
}
}
}
}
}
impl<S> Write for Rc<Concurrent<S>>
where
S: Fcntl + Write,
{
fn write<'a>(
&self,
fd: Fd,
buffer: &'a [u8],
) -> impl Future<Output = Result<usize>> + use<'a, S> {
let this = Rc::clone(self);
async move {
let this = TemporaryNonBlockingGuard::new(&this, fd);
let waker = LazyCell::default();
loop {
match this.inner.write(fd, buffer).await {
#[allow(unreachable_patterns)]
Err(Errno::EAGAIN | Errno::EWOULDBLOCK | Errno::EINTR) => {
this.yield_for_write(fd, &waker).await
}
result => return result,
}
}
}
}
}
impl<S> Concurrent<S> {
async fn yield_for_read<F>(&self, fd: Fd, waker: &LazyCell<Rc<Cell<Option<Waker>>>, F>)
where
F: FnOnce() -> Rc<Cell<Option<Waker>>>,
{
self.yield_once(fd, waker, |state| &mut state.reads).await
}
async fn yield_for_write<F>(&self, fd: Fd, waker: &LazyCell<Rc<Cell<Option<Waker>>>, F>)
where
F: FnOnce() -> Rc<Cell<Option<Waker>>>,
{
self.yield_once(fd, waker, |state| &mut state.writes).await
}
async fn yield_once<F, G>(
&self,
fd: Fd,
waker: &LazyCell<Rc<Cell<Option<Waker>>>, F>,
target: G,
) where
F: FnOnce() -> Rc<Cell<Option<Waker>>>,
G: Fn(&mut State) -> &mut HashMap<Fd, WakerSet>,
{
let mut first_time = true;
poll_fn(|context| {
if first_time {
first_time = false;
waker.set(Some(context.waker().clone()));
target(&mut self.state.borrow_mut())
.entry(fd)
.or_default()
.insert(Rc::downgrade(waker));
Pending
} else {
Ready(())
}
})
.await
}
}
impl<S> Concurrent<S>
where
S: Clock,
{
pub async fn sleep_until(&self, deadline: Instant) {
let waker: LazyCell<Rc<Cell<Option<Waker>>>> = LazyCell::default();
poll_fn(|context| {
if self.inner.now() >= deadline {
Ready(())
} else {
waker.set(Some(context.waker().clone()));
self.state
.borrow_mut()
.timeouts
.push(deadline, Rc::downgrade(&waker));
Pending
}
})
.await
}
pub async fn sleep(&self, duration: Duration) {
let now = self.inner.now();
let deadline = now + duration;
self.sleep_until(deadline).await;
}
}
impl<S> Concurrent<S> {
pub async fn wait_for_signals(&self) -> Rc<SignalList> {
let signals = {
let mut state = self.state.borrow_mut();
state.signals.upgrade().unwrap_or_else(|| {
let signals = Rc::new(SignalList::new());
state.signals = Rc::downgrade(&signals);
signals
})
};
let waker: LazyCell<Rc<Cell<Option<Waker>>>> = LazyCell::default();
poll_fn(|context| {
if signals.0.get().is_some() {
Ready(())
} else {
waker.set(Some(context.waker().clone()));
self.state
.borrow_mut()
.catches
.insert(Rc::downgrade(&waker));
Pending
}
})
.await;
signals
}
}
impl<S> Concurrent<S>
where
S: CaughtSignals + Clock + Select,
{
pub fn peek(&self) {
let select = pin!(self.select_impl(true));
let poll = select.poll(&mut Context::from_waker(Waker::noop()));
debug_assert_eq!(poll, Ready(()), "peek should not block");
}
pub async fn select(&self) {
self.select_impl(false).await;
}
#[allow(clippy::await_holding_refcell_ref)]
async fn select_impl(&self, peek: bool) {
let mut state = self.state.borrow_mut();
let mut readers = state.reads.keys().cloned().collect();
let mut writers = state.writes.keys().cloned().collect();
let timeout = if peek {
Some(Duration::ZERO)
} else {
state
.timeouts
.next_wake_time()
.map(|target| target.saturating_duration_since(self.inner.now()))
};
let signal_mask = (state.signals.strong_count() > 0)
.then(|| state.select_mask.as_deref())
.flatten();
let result = self
.inner
.select(&mut readers, &mut writers, timeout, signal_mask)
.await;
if result != Err(Errno::EINTR) {
wake_tasks_for_ready_fds(&mut state.reads, &readers);
wake_tasks_for_ready_fds(&mut state.writes, &writers);
}
if !state.timeouts.is_empty() {
state.timeouts.wake(self.inner.now());
}
if let Some(signal_list) = state.signals.upgrade() {
let signals = self.inner.caught_signals();
if !signals.is_empty() {
let set_result = signal_list.0.set(signals);
debug_assert_eq!(set_result, Ok(()), "SignalList should not be filled yet");
state.catches.wake_all();
state.signals = Weak::new();
}
}
}
}
fn wake_tasks_for_ready_fds(task_map: &mut HashMap<Fd, WakerSet>, ready_fds: &[Fd]) {
task_map.retain(|fd, wakers| {
if ready_fds.contains(fd) {
wakers.wake_all();
false
} else {
true
}
})
}
#[derive(Debug)]
struct TemporaryNonBlockingGuard<'a, S: Fcntl> {
system: &'a Concurrent<S>,
fd: Fd,
original_nonblocking: bool,
}
impl<'a, S: Fcntl> TemporaryNonBlockingGuard<'a, S> {
fn new(system: &'a Concurrent<S>, fd: Fd) -> Self {
Self {
system,
fd,
original_nonblocking: system.inner.get_and_set_nonblocking(fd, true) == Ok(true),
}
}
}
impl<'a, S: Fcntl> Drop for TemporaryNonBlockingGuard<'a, S> {
fn drop(&mut self) {
if !self.original_nonblocking {
self.system
.inner
.get_and_set_nonblocking(self.fd, false)
.ok();
}
}
}
impl<'a, S: Fcntl> Deref for TemporaryNonBlockingGuard<'a, S> {
type Target = Concurrent<S>;
fn deref(&self) -> &Self::Target {
self.system
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SignalList(OnceCell<Vec<crate::signal::Number>>);
impl Deref for SignalList {
type Target = Vec<crate::signal::Number>;
fn deref(&self) -> &Vec<crate::signal::Number> {
self.0.get().unwrap()
}
}
impl DerefMut for SignalList {
fn deref_mut(&mut self) -> &mut Vec<crate::signal::Number> {
self.0.get_mut().unwrap()
}
}
impl SignalList {
#[must_use]
fn new() -> Self {
Self(OnceCell::new())
}
pub fn into_vec(self) -> Vec<crate::signal::Number> {
self.0.into_inner().unwrap()
}
}
mod delegates;
mod run;
mod rw_all;
mod signal;
#[cfg(test)]
mod tests {
use super::super::{
Close as _, Disposition, Mode, OfdAccess, Open as _, OpenFlag, Pipe as _, SendSignal,
};
use super::*;
use crate::system::r#virtual::{PIPE_SIZE, SIGCHLD, SIGINT, SIGUSR2, VirtualSystem};
use crate::test_helper::WakeFlag;
use crate::trap::SignalSystem as _;
use assert_matches::assert_matches;
use futures_util::FutureExt as _;
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
#[test]
fn peek_with_no_conditions_returns_immediately() {
let system = Concurrent::new(VirtualSystem::new());
system.peek();
}
#[test]
fn select_with_no_conditions_never_completes() {
let system = Concurrent::new(VirtualSystem::new());
let future = pin!(system.select());
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
assert_eq!(future.poll(&mut context), Pending);
assert!(!wake_flag.is_woken());
}
#[test]
fn regular_file_read_completes_immediately() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let fd = system
.open(
c"/foo",
OfdAccess::ReadOnly,
OpenFlag::Create.into(),
Mode::empty(),
)
.now_or_never()
.unwrap()
.unwrap();
let mut buffer = [0; 4];
let future = pin!(system.read(fd, &mut buffer));
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
assert_eq!(future.poll(&mut context), Ready(Ok(0)));
assert!(!wake_flag.is_woken());
}
#[test]
fn pipe_read_becomes_ready_on_data_available() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let (read_fd, write_fd) = system.pipe().unwrap();
let mut buffer1 = [0; 4];
let mut buffer2 = [0; 4];
let mut read1 = pin!(system.read(read_fd, &mut buffer1));
let mut read2 = pin!(system.read(read_fd, &mut buffer2));
let wake_flag1 = Arc::new(WakeFlag::new());
let wake_flag2 = Arc::new(WakeFlag::new());
let waker1 = Waker::from(wake_flag1.clone());
let waker2 = Waker::from(wake_flag2.clone());
let mut context1 = Context::from_waker(&waker1);
let mut context2 = Context::from_waker(&waker2);
assert_eq!(read1.as_mut().poll(&mut context1), Pending);
assert_eq!(read2.as_mut().poll(&mut context2), Pending);
let mut select = pin!(system.select());
let mut context3 = Context::from_waker(Waker::noop());
assert_eq!(select.as_mut().poll(&mut context3), Pending);
assert!(!wake_flag1.is_woken());
assert!(!wake_flag2.is_woken());
system
.write(write_fd, &[1, 2, 3, 4])
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
assert!(wake_flag1.is_woken());
assert!(wake_flag2.is_woken());
}
#[test]
fn select_wakes_only_read_tasks_with_ready_fd() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let (read_fd1, write_fd1) = system.pipe().unwrap();
let (read_fd2, _write_fd2) = system.pipe().unwrap();
let mut buffer1 = [0; 4];
let mut buffer2 = [0; 4];
let mut read1 = pin!(system.read(read_fd1, &mut buffer1));
let mut read2 = pin!(system.read(read_fd2, &mut buffer2));
let wake_flag1 = Arc::new(WakeFlag::new());
let wake_flag2 = Arc::new(WakeFlag::new());
let waker1 = Waker::from(wake_flag1.clone());
let waker2 = Waker::from(wake_flag2.clone());
let mut context1 = Context::from_waker(&waker1);
let mut context2 = Context::from_waker(&waker2);
assert_eq!(read1.as_mut().poll(&mut context1), Pending);
assert_eq!(read2.as_mut().poll(&mut context2), Pending);
let mut select = pin!(system.select());
let mut context3 = Context::from_waker(Waker::noop());
assert_eq!(select.as_mut().poll(&mut context3), Pending);
assert!(!wake_flag1.is_woken());
assert!(!wake_flag2.is_woken());
system
.write(write_fd1, &[1, 2, 3, 4])
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
assert!(wake_flag1.is_woken());
assert!(!wake_flag2.is_woken());
}
#[test]
fn read_preserves_fd_blocking_mode() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let fd = system
.open(
c"/foo",
OfdAccess::ReadOnly,
OpenFlag::Create.into(),
Mode::empty(),
)
.now_or_never()
.unwrap()
.unwrap();
let mut buffer = [0; 4];
system
.read(fd, &mut buffer)
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
system.inner.get_and_set_nonblocking(fd, true).ok();
system
.read(fd, &mut buffer)
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
}
#[test]
fn regular_file_write_completes_immediately() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let fd = system
.open(
c"/foo",
OfdAccess::WriteOnly,
OpenFlag::Create.into(),
Mode::empty(),
)
.now_or_never()
.unwrap()
.unwrap();
let buffer = [1, 2, 3, 4];
let future = pin!(system.write(fd, &buffer));
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
assert_eq!(future.poll(&mut context), Ready(Ok(4)));
assert!(!wake_flag.is_woken());
}
#[test]
fn pipe_write_becomes_ready_on_buffer_space() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let (read_fd, write_fd) = system.pipe().unwrap();
system
.write(write_fd, &[0; PIPE_SIZE])
.now_or_never()
.unwrap()
.unwrap();
let buffer1 = [1, 2, 3, 4];
let buffer2 = [5, 6, 7, 8];
let mut write1 = pin!(system.write(write_fd, &buffer1));
let mut write2 = pin!(system.write(write_fd, &buffer2));
let wake_flag1 = Arc::new(WakeFlag::new());
let wake_flag2 = Arc::new(WakeFlag::new());
let waker1 = Waker::from(wake_flag1.clone());
let waker2 = Waker::from(wake_flag2.clone());
let mut context1 = Context::from_waker(&waker1);
let mut context2 = Context::from_waker(&waker2);
assert_eq!(write1.as_mut().poll(&mut context1), Pending);
assert_eq!(write2.as_mut().poll(&mut context2), Pending);
let mut select = pin!(system.select());
let mut context3 = Context::from_waker(Waker::noop());
assert_eq!(select.as_mut().poll(&mut context3), Pending);
assert!(!wake_flag1.is_woken());
assert!(!wake_flag2.is_woken());
let mut read_buffer = [0; PIPE_SIZE];
system
.read(read_fd, &mut read_buffer)
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
assert!(wake_flag1.is_woken());
assert!(wake_flag2.is_woken());
}
#[test]
fn select_wakes_only_write_tasks_with_ready_fd() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let (read_fd1, write_fd1) = system.pipe().unwrap();
let (_read_fd2, write_fd2) = system.pipe().unwrap();
system
.write(write_fd1, &[0; PIPE_SIZE])
.now_or_never()
.unwrap()
.unwrap();
system
.write(write_fd2, &[0; PIPE_SIZE])
.now_or_never()
.unwrap()
.unwrap();
let buffer1 = [1, 2, 3, 4];
let buffer2 = [5, 6, 7, 8];
let mut write1 = pin!(system.write(write_fd1, &buffer1));
let mut write2 = pin!(system.write(write_fd2, &buffer2));
let wake_flag1 = Arc::new(WakeFlag::new());
let wake_flag2 = Arc::new(WakeFlag::new());
let waker1 = Waker::from(wake_flag1.clone());
let waker2 = Waker::from(wake_flag2.clone());
let mut context1 = Context::from_waker(&waker1);
let mut context2 = Context::from_waker(&waker2);
assert_eq!(write1.as_mut().poll(&mut context1), Pending);
assert_eq!(write2.as_mut().poll(&mut context2), Pending);
let mut select = pin!(system.select());
let mut context3 = Context::from_waker(Waker::noop());
assert_eq!(select.as_mut().poll(&mut context3), Pending);
assert!(!wake_flag1.is_woken());
assert!(!wake_flag2.is_woken());
let mut read_buffer = [0; PIPE_SIZE];
system
.read(read_fd1, &mut read_buffer)
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
assert!(wake_flag1.is_woken());
assert!(!wake_flag2.is_woken());
}
#[test]
fn write_preserves_fd_blocking_mode() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let fd = system
.open(
c"/foo",
OfdAccess::WriteOnly,
OpenFlag::Create.into(),
Mode::empty(),
)
.now_or_never()
.unwrap()
.unwrap();
let buffer = [1, 2, 3, 4];
system.write(fd, &buffer).now_or_never().unwrap().unwrap();
assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
system.inner.get_and_set_nonblocking(fd, true).ok();
system.write(fd, &buffer).now_or_never().unwrap().unwrap();
assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
}
#[test]
fn sleep_completes_after_duration() {
let system = VirtualSystem::new();
let state = system.state.clone();
let now = Instant::now();
state.borrow_mut().now = Some(now);
let system = Concurrent::new(system);
let mut sleep = pin!(system.sleep(Duration::from_secs(1)));
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
assert_eq!(sleep.as_mut().poll(&mut context), Pending);
let mut select = pin!(system.select());
assert_eq!(select.as_mut().poll(&mut context), Pending);
assert!(!wake_flag.is_woken());
state
.borrow_mut()
.advance_time(now + Duration::from_secs(1));
assert_eq!(select.as_mut().poll(&mut context), Ready(()));
assert!(wake_flag.is_woken());
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
assert_eq!(sleep.poll(&mut context), Ready(()));
assert!(!wake_flag.is_woken());
}
#[test]
fn signal_wait_completes_on_signal() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
system
.set_disposition(SIGINT, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGCHLD, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGUSR2, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
let mut wait = pin!(system.wait_for_signals());
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
assert_eq!(wait.as_mut().poll(&mut context), Pending);
let mut select = pin!(system.select());
let mut null_context = Context::from_waker(Waker::noop());
assert_eq!(select.as_mut().poll(&mut null_context), Pending);
assert!(!wake_flag.is_woken());
system.raise(SIGINT).now_or_never().unwrap().unwrap();
system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
assert_eq!(select.as_mut().poll(&mut null_context), Ready(()));
assert!(wake_flag.is_woken());
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
assert_matches!(wait.poll(&mut context), Ready(signals) => {
assert_matches!(***signals, [SIGINT, SIGCHLD] | [SIGCHLD, SIGINT]);
});
}
#[test]
fn select_does_not_consume_caught_signals_until_tasks_are_waiting_for_signals() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let (read_fd, write_fd) = system.pipe().unwrap();
system
.set_disposition(SIGCHLD, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
let mut buffer = [0; 4];
let mut read = pin!(system.read(read_fd, &mut buffer));
let mut null_context = Context::from_waker(Waker::noop());
assert_eq!(read.as_mut().poll(&mut null_context), Pending);
system
.write(write_fd, b"foo")
.now_or_never()
.unwrap()
.unwrap();
system.select().now_or_never().unwrap();
let mut wait = pin!(system.wait_for_signals());
assert_eq!(wait.as_mut().poll(&mut null_context), Pending);
system.select().now_or_never().unwrap();
assert_matches!(wait.poll(&mut null_context), Ready(signals) => {
assert_eq!(**signals, &[SIGCHLD]);
});
}
#[test]
fn wait_for_signals_can_be_used_many_times() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
system
.set_disposition(SIGINT, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGCHLD, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
let mut wait1 = pin!(system.wait_for_signals());
let mut null_context = Context::from_waker(Waker::noop());
assert_eq!(wait1.as_mut().poll(&mut null_context), Pending);
system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
system.select().now_or_never().unwrap();
let mut wait2 = pin!(system.wait_for_signals());
assert_eq!(wait2.as_mut().poll(&mut null_context), Pending);
system.raise(SIGINT).now_or_never().unwrap().unwrap();
system.select().now_or_never().unwrap();
assert_matches!(wait1.poll(&mut null_context), Ready(signals) => {
assert_eq!(**signals, &[SIGCHLD]);
});
assert_matches!(wait2.poll(&mut null_context), Ready(signals) => {
assert_eq!(**signals, &[SIGINT]);
});
}
#[test]
fn select_completes_when_any_condition_is_ready() {
let system = VirtualSystem::new();
let state = system.state.clone();
let now = Instant::now();
state.borrow_mut().now = Some(now);
let system = Rc::new(Concurrent::new(system));
let (read_fd, write_fd) = system.pipe().unwrap();
let mut buffer = [0; 4];
system
.set_disposition(SIGINT, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
let mut sleep = pin!(system.sleep(Duration::from_secs(3)));
let mut read = pin!(system.read(read_fd, &mut buffer));
let mut wait = pin!(system.wait_for_signals());
let wake_sleep = Arc::new(WakeFlag::new());
let wake_read = Arc::new(WakeFlag::new());
let wake_wait = Arc::new(WakeFlag::new());
let sleep_waker = Waker::from(wake_sleep.clone());
let read_waker = Waker::from(wake_read.clone());
let wait_waker = Waker::from(wake_wait.clone());
let mut sleep_context = Context::from_waker(&sleep_waker);
let mut read_context = Context::from_waker(&read_waker);
let mut wait_context = Context::from_waker(&wait_waker);
assert_eq!(sleep.as_mut().poll(&mut sleep_context), Pending);
assert_eq!(read.as_mut().poll(&mut read_context), Pending);
assert_eq!(wait.as_mut().poll(&mut wait_context), Pending);
let mut select = pin!(system.select());
let wake_select = Arc::new(WakeFlag::new());
let select_waker = Waker::from(wake_select.clone());
let mut select_context = Context::from_waker(&select_waker);
assert_eq!(select.as_mut().poll(&mut select_context), Pending);
assert!(!wake_sleep.is_woken());
assert!(!wake_read.is_woken());
assert!(!wake_wait.is_woken());
assert!(!wake_select.is_woken());
system
.write(write_fd, b"foo")
.now_or_never()
.unwrap()
.unwrap();
assert!(wake_select.is_woken());
let wake_select = Arc::new(WakeFlag::new());
let select_waker = Waker::from(wake_select.clone());
let mut select_context = Context::from_waker(&select_waker);
assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
assert!(!wake_sleep.is_woken());
assert!(wake_read.is_woken());
assert!(!wake_wait.is_woken());
assert!(!wake_select.is_woken());
assert_eq!(read.now_or_never().unwrap(), Ok(3));
let mut select = pin!(system.select());
let wake_select = Arc::new(WakeFlag::new());
let select_waker = Waker::from(wake_select.clone());
let mut select_context = Context::from_waker(&select_waker);
assert_eq!(select.as_mut().poll(&mut select_context), Pending);
assert!(!wake_sleep.is_woken());
assert!(!wake_wait.is_woken());
assert!(!wake_select.is_woken());
state
.borrow_mut()
.advance_time(now + Duration::from_secs(3));
assert!(wake_select.is_woken());
let wake_select = Arc::new(WakeFlag::new());
let select_waker = Waker::from(wake_select.clone());
let mut select_context = Context::from_waker(&select_waker);
assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
assert!(wake_sleep.is_woken());
assert!(!wake_wait.is_woken());
assert!(!wake_select.is_woken());
sleep.now_or_never().unwrap();
let mut select = pin!(system.select());
let wake_select = Arc::new(WakeFlag::new());
let select_waker = Waker::from(wake_select.clone());
let mut select_context = Context::from_waker(&select_waker);
assert_eq!(select.as_mut().poll(&mut select_context), Pending);
assert!(!wake_wait.is_woken());
assert!(!wake_select.is_woken());
system.raise(SIGINT).now_or_never().unwrap().unwrap();
assert!(wake_select.is_woken());
let wake_select = Arc::new(WakeFlag::new());
let select_waker = Waker::from(wake_select.clone());
let mut select_context = Context::from_waker(&select_waker);
assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
assert!(wake_wait.is_woken());
assert!(!wake_select.is_woken());
assert_eq!(**wait.now_or_never().unwrap(), &[SIGINT]);
}
#[test]
fn select_wakes_all_reads_and_writes_on_ebadf() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let (read_fd1, _write_fd1) = system.pipe().unwrap();
let (_read_fd2, write_fd2) = system.pipe().unwrap();
system
.write(write_fd2, &[0; PIPE_SIZE])
.now_or_never()
.unwrap()
.unwrap();
let mut read_buffer = [0; 4];
let mut read = pin!(system.read(read_fd1, &mut read_buffer));
let mut write = pin!(system.write(write_fd2, &[1, 2, 3, 4]));
let wake_flag1 = Arc::new(WakeFlag::new());
let wake_flag2 = Arc::new(WakeFlag::new());
let waker1 = Waker::from(wake_flag1.clone());
let waker2 = Waker::from(wake_flag2.clone());
let mut context1 = Context::from_waker(&waker1);
let mut context2 = Context::from_waker(&waker2);
assert_eq!(read.as_mut().poll(&mut context1), Pending);
assert_eq!(write.as_mut().poll(&mut context2), Pending);
let mut select = pin!(system.select());
let wake_select = Arc::new(WakeFlag::new());
let select_waker = Waker::from(wake_select.clone());
let mut select_context = Context::from_waker(&select_waker);
assert_eq!(select.as_mut().poll(&mut select_context), Pending);
assert!(!wake_flag1.is_woken());
assert!(!wake_flag2.is_woken());
assert!(!wake_select.is_woken());
system.close(read_fd1).unwrap();
assert!(wake_select.is_woken());
let wake_select = Arc::new(WakeFlag::new());
let select_waker = Waker::from(wake_select.clone());
let mut select_context = Context::from_waker(&select_waker);
assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
assert!(wake_flag1.is_woken());
assert!(wake_flag2.is_woken());
assert!(!wake_select.is_woken());
}
#[test]
fn select_does_not_wake_reads_or_writes_on_eintr() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let (read_fd1, _write_fd1) = system.pipe().unwrap();
let (_read_fd2, write_fd2) = system.pipe().unwrap();
system
.write(write_fd2, &[0; PIPE_SIZE])
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGUSR2, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
let mut read_buffer = [0; 4];
let mut read = pin!(system.read(read_fd1, &mut read_buffer));
let mut write = pin!(system.write(write_fd2, &[1]));
let mut wait = pin!(system.wait_for_signals());
let wake_flag1 = Arc::new(WakeFlag::new());
let wake_flag2 = Arc::new(WakeFlag::new());
let wake_flag3 = Arc::new(WakeFlag::new());
let waker1 = Waker::from(wake_flag1.clone());
let waker2 = Waker::from(wake_flag2.clone());
let waker3 = Waker::from(wake_flag3.clone());
let mut context1 = Context::from_waker(&waker1);
let mut context2 = Context::from_waker(&waker2);
let mut context3 = Context::from_waker(&waker3);
assert_eq!(read.as_mut().poll(&mut context1), Pending);
assert_eq!(write.as_mut().poll(&mut context2), Pending);
assert_eq!(wait.as_mut().poll(&mut context3), Pending);
assert!(!wake_flag1.is_woken());
assert!(!wake_flag2.is_woken());
assert!(!wake_flag3.is_woken());
system.raise(SIGUSR2).now_or_never().unwrap().unwrap();
let mut select_fut = pin!(system.select());
let mut context4 = Context::from_waker(Waker::noop());
assert_eq!(select_fut.as_mut().poll(&mut context4), Ready(()));
assert!(!wake_flag1.is_woken());
assert!(!wake_flag2.is_woken());
}
#[test]
fn signal_wait_is_made_ready_by_peek_after_caught() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
system
.set_disposition(SIGINT, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGCHLD, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
system
.set_disposition(SIGUSR2, Disposition::Catch)
.now_or_never()
.unwrap()
.unwrap();
let mut wait = pin!(system.wait_for_signals());
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
assert_eq!(wait.as_mut().poll(&mut context), Pending);
system.peek();
assert!(!wake_flag.is_woken());
system.raise(SIGINT).now_or_never().unwrap().unwrap();
system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
system.peek();
assert!(wake_flag.is_woken());
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(wake_flag.clone());
let mut context = Context::from_waker(&waker);
assert_matches!(wait.poll(&mut context), Ready(signals) => {
assert_matches!(***signals, [SIGINT, SIGCHLD] | [SIGCHLD, SIGINT]);
});
}
}