yash_env/system/concurrency/
run_real.rs1#![cfg(unix)]
20
21use super::super::real::RealSystem;
22use super::Concurrent;
23use super::RunLoop;
24use super::Select as _;
25use futures_util::poll;
26use std::pin::pin;
27
28impl Concurrent<RealSystem> {
29 pub fn run_real<F, T>(&self, task: F) -> T
45 where
46 F: Future<Output = T>,
47 {
48 use std::task::Poll::{Pending, Ready};
49 use std::task::{Context, Waker};
50
51 let runner = pin!(async move {
52 let mut task = pin!(task);
53 loop {
54 if let Ready(result) = poll!(&mut task) {
55 return result;
56 }
57 self.select().await;
58 }
59 });
60 match runner.poll(&mut Context::from_waker(Waker::noop())) {
61 Ready(result) => result,
62 Pending => unreachable!("`RealSystem::select` should never return `Pending`"),
63 }
64 }
65}
66
67impl RunLoop for RealSystem {
69 #[inline(always)]
71 async fn run_loop<F>(concurrent: &Concurrent<Self>, task: F)
72 where
73 F: Future<Output = ()>,
74 {
75 concurrent.run_real(task)
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::super::Sleep as _;
82 use super::*;
83 use std::cell::Cell;
84 use std::time::Duration;
85
86 #[test]
87 fn run_real_returns_task_output_immediately_if_ready_on_first_poll() {
88 let system = Concurrent::new(unsafe { RealSystem::new() });
89 let result = system.run_real(async { 42 });
90 assert_eq!(result, 42);
91 }
92
93 #[test]
94 fn run_real_keeps_polling_task_until_completion_when_task_yields_multiple_times() {
95 let system = Concurrent::new(unsafe { RealSystem::new() });
96 let progress = Cell::new(0);
97
98 let result = system.run_real(async {
99 progress.set(1);
100 system.sleep(Duration::from_millis(1)).await;
101 progress.set(2);
102 system.sleep(Duration::from_millis(1)).await;
103 progress.set(3);
104 42
105 });
106
107 assert_eq!(result, 42);
108 assert_eq!(progress.get(), 3);
109 }
110
111 #[test]
112 fn run_real_calls_select_between_task_polls_while_task_is_pending() {
113 let system = Concurrent::new(unsafe { RealSystem::new() });
114 let progress = Cell::new(0);
115
116 let result = system.run_real(async {
117 progress.set(1);
118 system.sleep(Duration::from_millis(1)).await;
119 progress.set(2);
120 7
121 });
122
123 assert_eq!(result, 7);
124 assert_eq!(progress.get(), 2);
125 }
126
127 #[test]
128 #[should_panic = "boom"]
129 fn run_real_propagates_task_panic_to_caller() {
130 let system = Concurrent::new(unsafe { RealSystem::new() });
131 system.run_real(async { panic!("boom") })
132 }
133}