yash_env/system/concurrency/
run.rs1#[cfg(unix)]
20use super::super::real::RealSystem;
21use super::super::r#virtual::VirtualSystem;
22use super::Concurrent;
23use crate::job::ProcessState;
24use futures_util::{pending, poll};
25use std::pin::pin;
26
27#[cfg(unix)]
28impl Concurrent<RealSystem> {
29 pub fn run_real<F, T>(&self, task: F) -> T
45 where
46 F: Future<Output = T>,
47 {
48 use std::task::Poll::{Pending, Ready};
49 use std::task::{Context, Waker};
50
51 let runner = pin!(async move {
52 let mut task = pin!(task);
53 loop {
54 if let Ready(result) = poll!(&mut task) {
55 return result;
56 }
57 self.select().await;
58 }
59 });
60 match runner.poll(&mut Context::from_waker(Waker::noop())) {
61 Ready(result) => result,
62 Pending => unreachable!("`RealSystem::select` should never return `Pending`"),
63 }
64 }
65}
66
67impl Concurrent<VirtualSystem> {
68 pub async fn run_virtual<F>(&self, task: F)
83 where
84 F: Future<Output = ()>,
85 {
86 let mut task = pin!(task);
87 while poll!(&mut task).is_pending() {
88 let state = self.inner.current_process().state();
89 match state {
90 ProcessState::Running => {
91 }
94 ProcessState::Halted(result) => {
95 if result.is_stopped() {
96 let terminated = self.inner.block_while_stopped().await;
98 if !terminated {
99 continue;
101 }
102 }
103 return;
105 }
106 }
107
108 let mut select = pin!(self.select());
109 while poll!(&mut select).is_pending() {
110 let state = self.inner.current_process().state();
111 match state {
112 ProcessState::Running => {
113 pending!()
117 }
118 ProcessState::Halted(result) => {
119 if result.is_stopped() {
120 let terminated = self.inner.block_while_stopped().await;
122 if !terminated {
123 continue;
126 }
127 }
128 return;
130 }
131 }
132 }
133 }
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[cfg(unix)]
142 mod real_system {
143 use super::*;
144 use std::cell::Cell;
145 use std::time::Duration;
146
147 #[test]
148 fn run_real_returns_task_output_immediately_if_ready_on_first_poll() {
149 let system = Concurrent::new(unsafe { RealSystem::new() });
150 let result = system.run_real(async { 42 });
151 assert_eq!(result, 42);
152 }
153
154 #[test]
155 fn run_real_keeps_polling_task_until_completion_when_task_yields_multiple_times() {
156 let system = Concurrent::new(unsafe { RealSystem::new() });
157 let progress = Cell::new(0);
158
159 let result = system.run_real(async {
160 progress.set(1);
161 system.sleep(Duration::from_millis(1)).await;
162 progress.set(2);
163 system.sleep(Duration::from_millis(1)).await;
164 progress.set(3);
165 42
166 });
167
168 assert_eq!(result, 42);
169 assert_eq!(progress.get(), 3);
170 }
171
172 #[test]
173 fn run_real_calls_select_between_task_polls_while_task_is_pending() {
174 let system = Concurrent::new(unsafe { RealSystem::new() });
175 let progress = Cell::new(0);
176
177 let result = system.run_real(async {
178 progress.set(1);
179 system.sleep(Duration::from_millis(1)).await;
180 progress.set(2);
181 7
182 });
183
184 assert_eq!(result, 7);
185 assert_eq!(progress.get(), 2);
186 }
187
188 #[test]
189 #[should_panic = "boom"]
190 fn run_real_propagates_task_panic_to_caller() {
191 let system = Concurrent::new(unsafe { RealSystem::new() });
192 system.run_real(async { panic!("boom") })
193 }
194 }
195
196 mod virtual_system {
197 use super::*;
198 use crate::semantics::ExitStatus;
199 use crate::system::r#virtual::{SIGCONT, SIGKILL, SIGSTOP};
200 use crate::system::{Exit as _, SendSignal as _};
201 use crate::test_helper::WakeFlag;
202 use futures_util::FutureExt as _;
203 use std::cell::Cell;
204 use std::rc::Rc;
205 use std::sync::Arc;
206 use std::task::Poll::{Pending, Ready};
207 use std::task::{Context, Waker};
208 use std::time::{Duration, Instant};
209
210 struct DropFlag(Rc<Cell<bool>>);
211
212 impl Drop for DropFlag {
213 fn drop(&mut self) {
214 self.0.set(true);
215 }
216 }
217
218 fn virtual_system_with_current_time() -> (Concurrent<VirtualSystem>, Instant) {
219 let inner = VirtualSystem::new();
220 let now = Instant::now();
221 inner.state.borrow_mut().now = Some(now);
222 (Concurrent::new(inner), now)
223 }
224
225 #[test]
226 fn run_virtual_returns_immediately_when_task_is_ready_on_first_poll() {
227 let system = Concurrent::new(VirtualSystem::new());
228 let completed = Cell::new(false);
229
230 let result = system
231 .run_virtual(async { completed.set(true) })
232 .now_or_never();
233
234 assert_eq!(result, Some(()));
235 assert!(completed.get());
236 }
237
238 #[test]
239 fn run_virtual_completes_normally_when_task_alternates_between_pending_and_ready() {
240 let (system, now) = virtual_system_with_current_time();
241 let progress = Rc::new(Cell::new(0));
242 let progress_2 = Rc::clone(&progress);
243 let mut future = pin!(system.run_virtual(async {
244 progress_2.set(1);
245 system.sleep(Duration::from_secs(1)).await;
246 progress_2.set(2);
247 system.sleep(Duration::from_secs(1)).await;
248 progress_2.set(3);
249 }));
250
251 let mut context = Context::from_waker(Waker::noop());
252 assert_eq!(future.as_mut().poll(&mut context), Pending);
253 assert_eq!(progress.get(), 1);
254
255 system
256 .inner
257 .state
258 .borrow_mut()
259 .advance_time(now + Duration::from_secs(1));
260 assert_eq!(future.as_mut().poll(&mut context), Pending);
261 assert_eq!(progress.get(), 2);
262
263 system
264 .inner
265 .state
266 .borrow_mut()
267 .advance_time(now + Duration::from_secs(2));
268 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
269 assert_eq!(progress.get(), 3);
270 }
271
272 #[test]
273 fn run_virtual_waits_on_select_while_process_is_running_and_task_is_pending() {
274 let (system, now) = virtual_system_with_current_time();
275 let completed = Rc::new(Cell::new(false));
276 let completed_2 = Rc::clone(&completed);
277 let mut future = pin!(system.run_virtual(async {
278 system.sleep(Duration::from_secs(1)).await;
279 completed_2.set(true);
280 }));
281
282 let wake_flag = Arc::new(WakeFlag::new());
283 let waker = Waker::from(Arc::clone(&wake_flag));
284 let mut context = Context::from_waker(&waker);
285 assert_eq!(future.as_mut().poll(&mut context), Pending);
286 assert!(!completed.get());
287 assert!(!wake_flag.is_woken());
288
289 system
290 .inner
291 .state
292 .borrow_mut()
293 .advance_time(now + Duration::from_secs(1));
294 assert!(wake_flag.is_woken());
295
296 let wake_flag = Arc::new(WakeFlag::new());
297 let waker = Waker::from(Arc::clone(&wake_flag));
298 let mut context = Context::from_waker(&waker);
299 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
300 assert!(completed.get());
301 assert!(!wake_flag.is_woken());
302 }
303
304 #[test]
305 fn run_virtual_yields_pending_to_caller_while_waiting_on_pending_select_in_running_state() {
306 let (system, _now) = virtual_system_with_current_time();
307 let completed = Rc::new(Cell::new(false));
308 let completed_2 = Rc::clone(&completed);
309 let mut future = pin!(system.run_virtual(async {
310 system.sleep(Duration::from_secs(1)).await;
311 completed_2.set(true);
312 }));
313
314 let mut context = Context::from_waker(Waker::noop());
315 assert_eq!(future.as_mut().poll(&mut context), Pending);
316 assert_eq!(future.as_mut().poll(&mut context), Pending);
317 assert!(!completed.get());
318 }
319
320 #[test]
321 fn run_virtual_aborts_task_when_process_is_already_terminated_before_entering_select() {
322 let system = Concurrent::new(VirtualSystem::new());
323 let dropped = Rc::new(Cell::new(false));
324 let dropped_2 = Rc::clone(&dropped);
325 let mut future = pin!(system.run_virtual(async {
326 let _drop_flag = DropFlag(dropped_2);
327 system.exit(ExitStatus(42)).await;
328 }));
329
330 let mut context = Context::from_waker(Waker::noop());
331 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
332 assert!(dropped.get());
333 }
334
335 #[test]
336 fn run_virtual_blocks_while_stopped_before_select_and_resumes_task_when_process_is_continued()
337 {
338 let system = Concurrent::new(VirtualSystem::new());
339 let completed = Rc::new(Cell::new(false));
340 let mut future = pin!(system.run_virtual(async {
341 system.raise(SIGSTOP).await.unwrap();
342 completed.set(true);
343 }));
344
345 let mut context = Context::from_waker(Waker::noop());
346 assert_eq!(future.as_mut().poll(&mut context), Pending);
347 assert_eq!(
348 system.inner.current_process().state(),
349 ProcessState::stopped(SIGSTOP),
350 );
351 assert!(!completed.get());
352
353 _ = system.inner.current_process_mut().raise_signal(SIGCONT);
354 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
355 assert!(completed.get());
356 }
357
358 #[test]
359 fn run_virtual_blocks_while_stopped_before_select_and_aborts_when_process_terminates() {
360 let system = Concurrent::new(VirtualSystem::new());
361 let dropped = Rc::new(Cell::new(false));
362 let dropped_2 = Rc::clone(&dropped);
363 let mut future = pin!(system.run_virtual(async {
364 let _drop_flag = DropFlag(dropped_2);
365 system.raise(SIGSTOP).await.unwrap();
366 unreachable!("task should be aborted while stopped");
367 }));
368
369 let mut context = Context::from_waker(Waker::noop());
370 assert_eq!(future.as_mut().poll(&mut context), Pending);
371 assert!(!dropped.get());
372
373 _ = system.inner.current_process_mut().raise_signal(SIGKILL);
374 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
375 assert!(dropped.get());
376 }
377
378 #[test]
379 fn run_virtual_blocks_while_stopped_during_pending_select_and_continues_waiting_after_resume()
380 {
381 let (system, now) = virtual_system_with_current_time();
382 let completed = Rc::new(Cell::new(false));
383 let completed_2 = Rc::clone(&completed);
384 let mut future = pin!(system.run_virtual(async {
385 system.sleep(Duration::from_secs(1)).await;
386 completed_2.set(true);
387 }));
388
389 let mut context = Context::from_waker(Waker::noop());
390 assert_eq!(future.as_mut().poll(&mut context), Pending);
391
392 _ = system
393 .inner
394 .current_process_mut()
395 .set_state(ProcessState::stopped(SIGSTOP));
396 assert_eq!(future.as_mut().poll(&mut context), Pending);
397 assert!(!completed.get());
398
399 system
400 .inner
401 .state
402 .borrow_mut()
403 .advance_time(now + Duration::from_secs(1));
404 assert_eq!(future.as_mut().poll(&mut context), Pending);
405 assert!(!completed.get());
406
407 _ = system
408 .inner
409 .current_process_mut()
410 .set_state(ProcessState::Running);
411 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
412 assert!(completed.get());
413 }
414
415 #[test]
416 fn run_virtual_blocks_while_stopped_during_pending_select_and_aborts_when_terminated() {
417 let (system, _now) = virtual_system_with_current_time();
418 let dropped = Rc::new(Cell::new(false));
419 let mut future = pin!(system.run_virtual(async {
420 let _drop_flag = DropFlag(Rc::clone(&dropped));
421 system.sleep(Duration::from_secs(1)).await;
422 unreachable!("task should be aborted while sleeping");
423 }));
424
425 let mut context = Context::from_waker(Waker::noop());
426 assert_eq!(future.as_mut().poll(&mut context), Pending);
427
428 _ = system
429 .inner
430 .current_process_mut()
431 .set_state(ProcessState::stopped(SIGSTOP));
432 assert_eq!(future.as_mut().poll(&mut context), Pending);
433 assert!(!dropped.get());
434
435 _ = system.inner.current_process_mut().raise_signal(SIGKILL);
436 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
437 assert!(dropped.get());
438 }
439
440 #[test]
441 fn run_virtual_aborts_immediately_when_process_becomes_terminated_while_waiting_on_pending_select()
442 {
443 let (system, _now) = virtual_system_with_current_time();
444 let dropped = Rc::new(Cell::new(false));
445 let mut future = pin!(system.run_virtual(async {
446 let _drop_flag = DropFlag(Rc::clone(&dropped));
447 system.sleep(Duration::from_secs(1)).await;
448 unreachable!("task should be aborted while sleeping");
449 }));
450
451 let mut context = Context::from_waker(Waker::noop());
452 assert_eq!(future.as_mut().poll(&mut context), Pending);
453 assert!(!dropped.get());
454
455 _ = system.inner.current_process_mut().raise_signal(SIGKILL);
456 assert_eq!(future.as_mut().poll(&mut context), Ready(()));
457 assert!(dropped.get());
458 }
459 }
460}