yash_env/system/concurrency/
run_virtual.rs1use super::super::r#virtual::VirtualSystem;
20use super::Concurrent;
21use super::RunLoop;
22use super::Select as _;
23use crate::job::ProcessState;
24use futures_util::{pending, poll};
25use std::pin::pin;
26
27impl Concurrent<VirtualSystem> {
28 pub async fn run_virtual<F>(&self, task: F)
43 where
44 F: Future<Output = ()>,
45 {
46 let mut task = pin!(task);
47 while poll!(&mut task).is_pending() {
48 let state = self.inner.current_process().state();
49 match state {
50 ProcessState::Running => {
51 }
54 ProcessState::Halted(result) => {
55 if result.is_stopped() {
56 let terminated = self.inner.block_while_stopped().await;
58 if !terminated {
59 continue;
61 }
62 }
63 return;
65 }
66 }
67
68 let mut select = pin!(self.select());
69 while poll!(&mut select).is_pending() {
70 let state = self.inner.current_process().state();
71 match state {
72 ProcessState::Running => {
73 pending!()
77 }
78 ProcessState::Halted(result) => {
79 if result.is_stopped() {
80 let terminated = self.inner.block_while_stopped().await;
82 if !terminated {
83 continue;
86 }
87 }
88 return;
90 }
91 }
92 }
93 }
94 }
95}
96
97impl RunLoop for VirtualSystem {
99 #[inline(always)]
101 fn run_loop<'c, F>(
102 concurrent: &'c Concurrent<Self>,
103 task: F,
104 ) -> impl Future<Output = ()> + use<'c, F>
105 where
106 F: Future<Output = ()>,
107 {
108 concurrent.run_virtual(task)
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::super::Sleep as _;
115 use super::*;
116 use crate::semantics::ExitStatus;
117 use crate::system::r#virtual::{SIGCONT, SIGKILL, SIGSTOP};
118 use crate::system::{Exit as _, SendSignal as _};
119 use crate::test_helper::WakeFlag;
120 use futures_util::FutureExt as _;
121 use std::cell::Cell;
122 use std::rc::Rc;
123 use std::sync::Arc;
124 use std::task::Poll::{Pending, Ready};
125 use std::task::{Context, Waker};
126 use std::time::{Duration, Instant};
127
128 struct DropFlag(Rc<Cell<bool>>);
129
130 impl Drop for DropFlag {
131 fn drop(&mut self) {
132 self.0.set(true);
133 }
134 }
135
136 fn virtual_system_with_current_time() -> (Concurrent<VirtualSystem>, Instant) {
137 let inner = VirtualSystem::new();
138 let now = Instant::now();
139 inner.state.borrow_mut().now = Some(now);
140 (Concurrent::new(inner), now)
141 }
142
143 #[test]
144 fn run_virtual_returns_immediately_when_task_is_ready_on_first_poll() {
145 let system = Concurrent::new(VirtualSystem::new());
146 let completed = Cell::new(false);
147
148 let result = system
149 .run_virtual(async { completed.set(true) })
150 .now_or_never();
151
152 assert_eq!(result, Some(()));
153 assert!(completed.get());
154 }
155
156 #[test]
157 fn run_virtual_completes_normally_when_task_alternates_between_pending_and_ready() {
158 let (system, now) = virtual_system_with_current_time();
159 let progress = Rc::new(Cell::new(0));
160 let progress_2 = Rc::clone(&progress);
161 let mut future = pin!(system.run_virtual(async {
162 progress_2.set(1);
163 system.sleep(Duration::from_secs(1)).await;
164 progress_2.set(2);
165 system.sleep(Duration::from_secs(1)).await;
166 progress_2.set(3);
167 }));
168
169 let mut context = Context::from_waker(Waker::noop());
170 assert_eq!(future.as_mut().poll(&mut context), Pending);
171 assert_eq!(progress.get(), 1);
172
173 system
174 .inner
175 .state
176 .borrow_mut()
177 .advance_time(now + Duration::from_secs(1));
178 assert_eq!(future.as_mut().poll(&mut context), Pending);
179 assert_eq!(progress.get(), 2);
180
181 system
182 .inner
183 .state
184 .borrow_mut()
185 .advance_time(now + Duration::from_secs(2));
186 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
187 assert_eq!(progress.get(), 3);
188 }
189
190 #[test]
191 fn run_virtual_waits_on_select_while_process_is_running_and_task_is_pending() {
192 let (system, now) = virtual_system_with_current_time();
193 let completed = Rc::new(Cell::new(false));
194 let completed_2 = Rc::clone(&completed);
195 let mut future = pin!(system.run_virtual(async {
196 system.sleep(Duration::from_secs(1)).await;
197 completed_2.set(true);
198 }));
199
200 let wake_flag = Arc::new(WakeFlag::new());
201 let waker = Waker::from(Arc::clone(&wake_flag));
202 let mut context = Context::from_waker(&waker);
203 assert_eq!(future.as_mut().poll(&mut context), Pending);
204 assert!(!completed.get());
205 assert!(!wake_flag.is_woken());
206
207 system
208 .inner
209 .state
210 .borrow_mut()
211 .advance_time(now + Duration::from_secs(1));
212 assert!(wake_flag.is_woken());
213
214 let wake_flag = Arc::new(WakeFlag::new());
215 let waker = Waker::from(Arc::clone(&wake_flag));
216 let mut context = Context::from_waker(&waker);
217 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
218 assert!(completed.get());
219 assert!(!wake_flag.is_woken());
220 }
221
222 #[test]
223 fn run_virtual_yields_pending_to_caller_while_waiting_on_pending_select_in_running_state() {
224 let (system, _now) = virtual_system_with_current_time();
225 let completed = Rc::new(Cell::new(false));
226 let completed_2 = Rc::clone(&completed);
227 let mut future = pin!(system.run_virtual(async {
228 system.sleep(Duration::from_secs(1)).await;
229 completed_2.set(true);
230 }));
231
232 let mut context = Context::from_waker(Waker::noop());
233 assert_eq!(future.as_mut().poll(&mut context), Pending);
234 assert_eq!(future.as_mut().poll(&mut context), Pending);
235 assert!(!completed.get());
236 }
237
238 #[test]
239 fn run_virtual_aborts_task_when_process_is_already_terminated_before_entering_select() {
240 let system = Concurrent::new(VirtualSystem::new());
241 let dropped = Rc::new(Cell::new(false));
242 let dropped_2 = Rc::clone(&dropped);
243 let mut future = pin!(system.run_virtual(async {
244 let _drop_flag = DropFlag(dropped_2);
245 system.exit(ExitStatus(42)).await;
246 }));
247
248 let mut context = Context::from_waker(Waker::noop());
249 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
250 assert!(dropped.get());
251 }
252
253 #[test]
254 fn run_virtual_blocks_while_stopped_before_select_and_resumes_task_when_process_is_continued() {
255 let system = Concurrent::new(VirtualSystem::new());
256 let completed = Rc::new(Cell::new(false));
257 let mut future = pin!(system.run_virtual(async {
258 system.raise(SIGSTOP).await.unwrap();
259 completed.set(true);
260 }));
261
262 let mut context = Context::from_waker(Waker::noop());
263 assert_eq!(future.as_mut().poll(&mut context), Pending);
264 assert_eq!(
265 system.inner.current_process().state(),
266 ProcessState::stopped(SIGSTOP),
267 );
268 assert!(!completed.get());
269
270 _ = system.inner.current_process_mut().raise_signal(SIGCONT);
271 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
272 assert!(completed.get());
273 }
274
275 #[test]
276 fn run_virtual_blocks_while_stopped_before_select_and_aborts_when_process_terminates() {
277 let system = Concurrent::new(VirtualSystem::new());
278 let dropped = Rc::new(Cell::new(false));
279 let dropped_2 = Rc::clone(&dropped);
280 let mut future = pin!(system.run_virtual(async {
281 let _drop_flag = DropFlag(dropped_2);
282 system.raise(SIGSTOP).await.unwrap();
283 unreachable!("task should be aborted while stopped");
284 }));
285
286 let mut context = Context::from_waker(Waker::noop());
287 assert_eq!(future.as_mut().poll(&mut context), Pending);
288 assert!(!dropped.get());
289
290 _ = system.inner.current_process_mut().raise_signal(SIGKILL);
291 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
292 assert!(dropped.get());
293 }
294
295 #[test]
296 fn run_virtual_blocks_while_stopped_during_pending_select_and_continues_waiting_after_resume() {
297 let (system, now) = virtual_system_with_current_time();
298 let completed = Rc::new(Cell::new(false));
299 let completed_2 = Rc::clone(&completed);
300 let mut future = pin!(system.run_virtual(async {
301 system.sleep(Duration::from_secs(1)).await;
302 completed_2.set(true);
303 }));
304
305 let mut context = Context::from_waker(Waker::noop());
306 assert_eq!(future.as_mut().poll(&mut context), Pending);
307
308 _ = system
309 .inner
310 .current_process_mut()
311 .set_state(ProcessState::stopped(SIGSTOP));
312 assert_eq!(future.as_mut().poll(&mut context), Pending);
313 assert!(!completed.get());
314
315 system
316 .inner
317 .state
318 .borrow_mut()
319 .advance_time(now + Duration::from_secs(1));
320 assert_eq!(future.as_mut().poll(&mut context), Pending);
321 assert!(!completed.get());
322
323 _ = system
324 .inner
325 .current_process_mut()
326 .set_state(ProcessState::Running);
327 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
328 assert!(completed.get());
329 }
330
331 #[test]
332 fn run_virtual_blocks_while_stopped_during_pending_select_and_aborts_when_terminated() {
333 let (system, _now) = virtual_system_with_current_time();
334 let dropped = Rc::new(Cell::new(false));
335 let mut future = pin!(system.run_virtual(async {
336 let _drop_flag = DropFlag(Rc::clone(&dropped));
337 system.sleep(Duration::from_secs(1)).await;
338 unreachable!("task should be aborted while sleeping");
339 }));
340
341 let mut context = Context::from_waker(Waker::noop());
342 assert_eq!(future.as_mut().poll(&mut context), Pending);
343
344 _ = system
345 .inner
346 .current_process_mut()
347 .set_state(ProcessState::stopped(SIGSTOP));
348 assert_eq!(future.as_mut().poll(&mut context), Pending);
349 assert!(!dropped.get());
350
351 _ = system.inner.current_process_mut().raise_signal(SIGKILL);
352 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
353 assert!(dropped.get());
354 }
355
356 #[test]
357 fn run_virtual_aborts_immediately_when_process_becomes_terminated_while_waiting_on_pending_select()
358 {
359 let (system, _now) = virtual_system_with_current_time();
360 let dropped = Rc::new(Cell::new(false));
361 let mut future = pin!(system.run_virtual(async {
362 let _drop_flag = DropFlag(Rc::clone(&dropped));
363 system.sleep(Duration::from_secs(1)).await;
364 unreachable!("task should be aborted while sleeping");
365 }));
366
367 let mut context = Context::from_waker(Waker::noop());
368 assert_eq!(future.as_mut().poll(&mut context), Pending);
369 assert!(!dropped.get());
370
371 _ = system.inner.current_process_mut().raise_signal(SIGKILL);
372 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
373 assert!(dropped.get());
374 }
375}