use super::super::r#virtual::VirtualSystem;
use super::Concurrent;
use super::RunLoop;
use super::Select as _;
use crate::job::ProcessState;
use futures_util::{pending, poll};
use std::pin::pin;
impl Concurrent<VirtualSystem> {
pub async fn run_virtual<F>(&self, task: F)
where
F: Future<Output = ()>,
{
let mut task = pin!(task);
while poll!(&mut task).is_pending() {
let state = self.inner.current_process().state();
match state {
ProcessState::Running => {
}
ProcessState::Halted(result) => {
if result.is_stopped() {
let terminated = self.inner.block_while_stopped().await;
if !terminated {
continue;
}
}
return;
}
}
let mut select = pin!(self.select());
while poll!(&mut select).is_pending() {
let state = self.inner.current_process().state();
match state {
ProcessState::Running => {
pending!()
}
ProcessState::Halted(result) => {
if result.is_stopped() {
let terminated = self.inner.block_while_stopped().await;
if !terminated {
continue;
}
}
return;
}
}
}
}
}
}
impl RunLoop for VirtualSystem {
#[inline(always)]
fn run_loop<'c, F>(
concurrent: &'c Concurrent<Self>,
task: F,
) -> impl Future<Output = ()> + use<'c, F>
where
F: Future<Output = ()>,
{
concurrent.run_virtual(task)
}
}
#[cfg(test)]
mod tests {
use super::super::Sleep as _;
use super::*;
use crate::semantics::ExitStatus;
use crate::system::r#virtual::{SIGCONT, SIGKILL, SIGSTOP};
use crate::system::{Exit as _, SendSignal as _};
use crate::test_helper::WakeFlag;
use futures_util::FutureExt as _;
use std::cell::Cell;
use std::rc::Rc;
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Waker};
use std::time::{Duration, Instant};
struct DropFlag(Rc<Cell<bool>>);
impl Drop for DropFlag {
fn drop(&mut self) {
self.0.set(true);
}
}
fn virtual_system_with_current_time() -> (Concurrent<VirtualSystem>, Instant) {
let inner = VirtualSystem::new();
let now = Instant::now();
inner.state.borrow_mut().now = Some(now);
(Concurrent::new(inner), now)
}
#[test]
fn run_virtual_returns_immediately_when_task_is_ready_on_first_poll() {
let system = Concurrent::new(VirtualSystem::new());
let completed = Cell::new(false);
let result = system
.run_virtual(async { completed.set(true) })
.now_or_never();
assert_eq!(result, Some(()));
assert!(completed.get());
}
#[test]
fn run_virtual_completes_normally_when_task_alternates_between_pending_and_ready() {
let (system, now) = virtual_system_with_current_time();
let progress = Rc::new(Cell::new(0));
let progress_2 = Rc::clone(&progress);
let mut future = pin!(system.run_virtual(async {
progress_2.set(1);
system.sleep(Duration::from_secs(1)).await;
progress_2.set(2);
system.sleep(Duration::from_secs(1)).await;
progress_2.set(3);
}));
let mut context = Context::from_waker(Waker::noop());
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert_eq!(progress.get(), 1);
system
.inner
.state
.borrow_mut()
.advance_time(now + Duration::from_secs(1));
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert_eq!(progress.get(), 2);
system
.inner
.state
.borrow_mut()
.advance_time(now + Duration::from_secs(2));
assert_eq!(future.as_mut().poll(&mut context), Ready(()));
assert_eq!(progress.get(), 3);
}
#[test]
fn run_virtual_waits_on_select_while_process_is_running_and_task_is_pending() {
let (system, now) = virtual_system_with_current_time();
let completed = Rc::new(Cell::new(false));
let completed_2 = Rc::clone(&completed);
let mut future = pin!(system.run_virtual(async {
system.sleep(Duration::from_secs(1)).await;
completed_2.set(true);
}));
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&wake_flag));
let mut context = Context::from_waker(&waker);
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert!(!completed.get());
assert!(!wake_flag.is_woken());
system
.inner
.state
.borrow_mut()
.advance_time(now + Duration::from_secs(1));
assert!(wake_flag.is_woken());
let wake_flag = Arc::new(WakeFlag::new());
let waker = Waker::from(Arc::clone(&wake_flag));
let mut context = Context::from_waker(&waker);
assert_eq!(future.as_mut().poll(&mut context), Ready(()));
assert!(completed.get());
assert!(!wake_flag.is_woken());
}
#[test]
fn run_virtual_yields_pending_to_caller_while_waiting_on_pending_select_in_running_state() {
let (system, _now) = virtual_system_with_current_time();
let completed = Rc::new(Cell::new(false));
let completed_2 = Rc::clone(&completed);
let mut future = pin!(system.run_virtual(async {
system.sleep(Duration::from_secs(1)).await;
completed_2.set(true);
}));
let mut context = Context::from_waker(Waker::noop());
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert!(!completed.get());
}
#[test]
fn run_virtual_aborts_task_when_process_is_already_terminated_before_entering_select() {
let system = Concurrent::new(VirtualSystem::new());
let dropped = Rc::new(Cell::new(false));
let dropped_2 = Rc::clone(&dropped);
let mut future = pin!(system.run_virtual(async {
let _drop_flag = DropFlag(dropped_2);
system.exit(ExitStatus(42)).await;
}));
let mut context = Context::from_waker(Waker::noop());
assert_eq!(future.as_mut().poll(&mut context), Ready(()));
assert!(dropped.get());
}
#[test]
fn run_virtual_blocks_while_stopped_before_select_and_resumes_task_when_process_is_continued() {
let system = Concurrent::new(VirtualSystem::new());
let completed = Rc::new(Cell::new(false));
let mut future = pin!(system.run_virtual(async {
system.raise(SIGSTOP).await.unwrap();
completed.set(true);
}));
let mut context = Context::from_waker(Waker::noop());
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert_eq!(
system.inner.current_process().state(),
ProcessState::stopped(SIGSTOP),
);
assert!(!completed.get());
_ = system.inner.current_process_mut().raise_signal(SIGCONT);
assert_eq!(future.as_mut().poll(&mut context), Ready(()));
assert!(completed.get());
}
#[test]
fn run_virtual_blocks_while_stopped_before_select_and_aborts_when_process_terminates() {
let system = Concurrent::new(VirtualSystem::new());
let dropped = Rc::new(Cell::new(false));
let dropped_2 = Rc::clone(&dropped);
let mut future = pin!(system.run_virtual(async {
let _drop_flag = DropFlag(dropped_2);
system.raise(SIGSTOP).await.unwrap();
unreachable!("task should be aborted while stopped");
}));
let mut context = Context::from_waker(Waker::noop());
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert!(!dropped.get());
_ = system.inner.current_process_mut().raise_signal(SIGKILL);
assert_eq!(future.as_mut().poll(&mut context), Ready(()));
assert!(dropped.get());
}
#[test]
fn run_virtual_blocks_while_stopped_during_pending_select_and_continues_waiting_after_resume() {
let (system, now) = virtual_system_with_current_time();
let completed = Rc::new(Cell::new(false));
let completed_2 = Rc::clone(&completed);
let mut future = pin!(system.run_virtual(async {
system.sleep(Duration::from_secs(1)).await;
completed_2.set(true);
}));
let mut context = Context::from_waker(Waker::noop());
assert_eq!(future.as_mut().poll(&mut context), Pending);
_ = system
.inner
.current_process_mut()
.set_state(ProcessState::stopped(SIGSTOP));
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert!(!completed.get());
system
.inner
.state
.borrow_mut()
.advance_time(now + Duration::from_secs(1));
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert!(!completed.get());
_ = system
.inner
.current_process_mut()
.set_state(ProcessState::Running);
assert_eq!(future.as_mut().poll(&mut context), Ready(()));
assert!(completed.get());
}
#[test]
fn run_virtual_blocks_while_stopped_during_pending_select_and_aborts_when_terminated() {
let (system, _now) = virtual_system_with_current_time();
let dropped = Rc::new(Cell::new(false));
let mut future = pin!(system.run_virtual(async {
let _drop_flag = DropFlag(Rc::clone(&dropped));
system.sleep(Duration::from_secs(1)).await;
unreachable!("task should be aborted while sleeping");
}));
let mut context = Context::from_waker(Waker::noop());
assert_eq!(future.as_mut().poll(&mut context), Pending);
_ = system
.inner
.current_process_mut()
.set_state(ProcessState::stopped(SIGSTOP));
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert!(!dropped.get());
_ = system.inner.current_process_mut().raise_signal(SIGKILL);
assert_eq!(future.as_mut().poll(&mut context), Ready(()));
assert!(dropped.get());
}
#[test]
fn run_virtual_aborts_immediately_when_process_becomes_terminated_while_waiting_on_pending_select()
{
let (system, _now) = virtual_system_with_current_time();
let dropped = Rc::new(Cell::new(false));
let mut future = pin!(system.run_virtual(async {
let _drop_flag = DropFlag(Rc::clone(&dropped));
system.sleep(Duration::from_secs(1)).await;
unreachable!("task should be aborted while sleeping");
}));
let mut context = Context::from_waker(Waker::noop());
assert_eq!(future.as_mut().poll(&mut context), Pending);
assert!(!dropped.get());
_ = system.inner.current_process_mut().raise_signal(SIGKILL);
assert_eq!(future.as_mut().poll(&mut context), Ready(()));
assert!(dropped.get());
}
}