1use slabmap::SlabMap;
2use std::{
3 cell::RefCell,
4 collections::VecDeque,
5 future::Future,
6 mem::{replace, swap},
7 ops::ControlFlow,
8 pin::{pin, Pin},
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc, Mutex,
12 },
13 task::{Context, Poll, Wake, Waker},
14};
15
16const ID_NULL: usize = usize::MAX;
17const ID_MAIN: usize = usize::MAX - 1;
18
19pub trait RuntimeInjector: 'static {
20 fn waker(&self) -> Arc<dyn RuntimeWaker>;
21}
22pub trait RuntimeLoop {
23 fn waker(&self) -> Arc<dyn RuntimeWaker>;
24 fn run<T>(&self, on_step: impl FnMut() -> ControlFlow<T>) -> T;
25}
26
27pub trait RuntimeWaker: 'static + Send + Sync {
28 fn wake(&self);
29}
30
31pub fn run<F: Future>(l: &impl RuntimeLoop, future: F) -> F::Output {
32 let mut runner = Runner::new(l.waker(), None);
33 Runtime::enter(&runner.rc);
34 runner.rc.push_wake(ID_MAIN);
35
36 let mut main = pin!(future);
37 let main_wake = TaskWake::new(ID_MAIN, &runner.rc);
38 let value = l.run(|| {
39 while runner.ready_requests() {
40 for id in runner.wakes.drain(..) {
41 if id == ID_MAIN {
42 match main
43 .as_mut()
44 .poll(&mut Context::from_waker(&main_wake.waker()))
45 {
46 Poll::Ready(value) => return ControlFlow::Break(value),
47 Poll::Pending => {}
48 }
49 } else {
50 run_item(&mut runner.rs[id]);
51 }
52 }
53 runner.apply_drops();
54 }
55 ControlFlow::Continue(())
56 });
57 Runtime::leave();
58 value
59}
60
61thread_local! {
62 static RUNNER: RefCell<Option<Runner>> = RefCell::new(None);
63}
64
65pub fn enter(injector: impl RuntimeInjector) {
66 let runner = Runner::new(injector.waker(), Some(Box::new(injector)));
67 Runtime::enter(&runner.rc);
68 RUNNER.with(|r| *r.borrow_mut() = Some(runner));
69}
70pub fn leave() {
71 let runner = RUNNER.with(|r| r.borrow_mut().take().expect("runtime is not exists"));
72 Runtime::leave();
73 drop(runner);
74}
75pub fn on_step() {
76 RUNNER.with(|r| {
77 r.borrow_mut()
78 .as_mut()
79 .expect("runtime is not exists")
80 .step()
81 });
82}
83pub fn on_idle() -> bool {
84 if let Some(on_idle) = Runtime::with(|rt| rt.rc.pop_on_idle()) {
85 on_idle.wake();
86 true
87 } else {
88 false
89 }
90}
91
92#[must_use]
98#[track_caller]
99pub fn spawn_local<F: Future + 'static>(future: F) -> Task<F::Output> {
100 Runtime::with(|rt| {
101 let need_wake = rt.rs.is_empty();
102 let task = RawTask::new(&rt.rc);
103 rt.rs.push(Box::pin(RawRunnable {
104 task: task.clone(),
105 future,
106 }));
107 if need_wake {
108 rt.rc.0.waker.wake();
109 }
110 Task {
111 task,
112 is_detach: false,
113 }
114 })
115}
116
117pub async fn wait_for_idle() {
121 struct WaitForIdle {
122 is_ready: bool,
123 }
124 impl Future for WaitForIdle {
125 type Output = ();
126
127 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128 if self.is_ready {
129 Poll::Ready(())
130 } else {
131 self.is_ready = true;
132 Runtime::with(|rt| rt.rc.push_on_idle(cx.waker().clone()));
133 Poll::Pending
134 }
135 }
136 }
137
138 WaitForIdle { is_ready: false }.await;
139}
140
141#[derive(Clone)]
142struct RequestChannel(Arc<RequestsData>);
143
144impl RequestChannel {
145 fn new(waker: Arc<dyn RuntimeWaker>) -> Self {
146 Self(Arc::new(RequestsData {
147 reqs: Mutex::new(RawRequests::new()),
148 waker,
149 }))
150 }
151 fn swap(&self, wakes: &mut Vec<usize>, drops: &mut Vec<usize>) {
152 let mut d = self.0.reqs.lock().unwrap();
153 swap(wakes, &mut d.wakes);
154 swap(drops, &mut d.drops);
155 }
156 fn push_with(&self, f: impl FnOnce(&mut RawRequests)) {
157 let mut d = self.0.reqs.lock().unwrap();
158 let call_wake = d.is_empty();
159 f(&mut d);
160 if call_wake {
161 self.0.waker.wake();
162 }
163 }
164 fn push_wake(&self, id: usize) {
165 self.push_with(|d| d.wakes.push(id));
166 }
167 fn push_drop(&self, id: usize) {
168 self.push_with(|d| d.drops.push(id));
169 }
170 fn push_on_idle(&self, waker: Waker) {
171 self.push_with(|d| d.on_idle.push_back(waker));
172 }
173 fn pop_on_idle(&self) -> Option<Waker> {
174 self.0.reqs.lock().unwrap().on_idle.pop_front()
175 }
176}
177struct RequestsData {
178 waker: Arc<dyn RuntimeWaker>,
179 reqs: Mutex<RawRequests>,
180}
181
182struct RawRequests {
183 wakes: Vec<usize>,
184 drops: Vec<usize>,
185 on_idle: VecDeque<Waker>,
186}
187
188impl RawRequests {
189 fn new() -> Self {
190 Self {
191 wakes: Vec::new(),
192 drops: Vec::new(),
193 on_idle: VecDeque::new(),
194 }
195 }
196 fn is_empty(&self) -> bool {
197 self.wakes.is_empty() && self.drops.is_empty() && self.on_idle.is_empty()
198 }
199}
200
201thread_local! {
202 static RUNTIME: RefCell<Option<Runtime>> = RefCell::new(None);
203}
204
205struct Runtime {
206 rc: RequestChannel,
207 rs: Vec<Pin<Box<dyn DynRunnable>>>,
208}
209
210impl Runtime {
211 fn new(rc: RequestChannel) -> Self {
212 Self { rc, rs: Vec::new() }
213 }
214 fn enter(rc: &RequestChannel) {
215 RUNTIME.with(|rt| {
216 let mut rt = rt.borrow_mut();
217 if rt.is_some() {
218 panic!("runtime is already running");
219 }
220 *rt = Some(Runtime::new(rc.clone()));
221 })
222 }
223 fn leave() {
224 RUNTIME.with(|rt| rt.borrow_mut().take());
225 }
226 #[track_caller]
227 fn with<T>(f: impl FnOnce(&mut Self) -> T) -> T {
228 RUNTIME
229 .with(|rt| rt.borrow_mut().as_mut().map(f))
230 .expect("runtime is not running")
231 }
232}
233
234pub struct Task<T> {
240 task: Arc<RawTask<T>>,
241 is_detach: bool,
242}
243
244struct RawTask<T> {
245 state: Mutex<TaskState<T>>,
246 reqs: RequestChannel,
247}
248
249enum TaskState<T> {
250 Running { id: usize, waker: Option<Waker> },
251 Cancelled,
252 Completed(T),
253 Finished,
254}
255
256impl<T> Task<T> {
257 pub fn detach(mut self) {
259 self.is_detach = true;
260 }
261}
262
263impl<T> Drop for Task<T> {
264 fn drop(&mut self) {
265 if !self.is_detach {
266 let mut state = self.task.state.lock().unwrap();
267 if let &TaskState::Running { id, .. } = &*state {
268 *state = TaskState::Cancelled;
269 if id != ID_NULL {
270 self.task.reqs.push_wake(id);
271 }
272 }
273 }
274 }
275}
276impl<T> Future for Task<T> {
277 type Output = T;
278
279 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
280 let mut state = self.task.state.lock().unwrap();
281 match &*state {
282 &TaskState::Running { id, .. } => {
283 *state = TaskState::Running {
284 id,
285 waker: Some(cx.waker().clone()),
286 };
287 Poll::Pending
288 }
289 TaskState::Cancelled => Poll::Pending,
290 TaskState::Completed(_) => {
291 if let TaskState::Completed(value) = replace(&mut *state, TaskState::Finished) {
292 Poll::Ready(value)
293 } else {
294 unreachable!()
295 }
296 }
297 TaskState::Finished => panic!("`poll` called twice"),
298 }
299 }
300}
301
302impl<T> RawTask<T> {
303 fn new(rc: &RequestChannel) -> Arc<Self> {
304 Arc::new(RawTask {
305 state: Mutex::new(TaskState::Running {
306 id: ID_NULL,
307 waker: None,
308 }),
309 reqs: rc.clone(),
310 })
311 }
312 fn complete(&self, value: T) {
313 if let TaskState::Running {
314 waker: Some(waker), ..
315 } = replace(
316 &mut *self.state.lock().unwrap(),
317 TaskState::Completed(value),
318 ) {
319 waker.wake()
320 }
321 }
322 fn is_cancelled(&self) -> bool {
323 matches!(&*self.state.lock().unwrap(), TaskState::Cancelled)
324 }
325}
326
327trait DynRunnable {
328 fn set_id(self: Pin<&Self>, id: usize);
329 fn run(self: Pin<&mut Self>, waker: &Waker) -> bool;
330}
331
332struct RawRunnable<F: Future> {
333 task: Arc<RawTask<F::Output>>,
334 future: F,
335}
336impl<Fut: Future> DynRunnable for RawRunnable<Fut> {
337 fn set_id(self: Pin<&Self>, id: usize) {
338 if let TaskState::Running { id: id_, .. } = &mut *self.task.state.lock().unwrap() {
339 *id_ = id;
340 }
341 }
342 fn run(self: Pin<&mut Self>, waker: &Waker) -> bool {
343 if self.task.is_cancelled() {
344 false
345 } else {
346 unsafe {
347 let this = self.get_unchecked_mut();
348 let f = Pin::new_unchecked(&mut this.future);
349 if let Poll::Ready(value) = f.poll(&mut Context::from_waker(waker)) {
350 this.task.complete(value);
351 false
352 } else {
353 true
354 }
355 }
356 }
357 }
358}
359
360struct Runner {
361 rc: RequestChannel,
362 wakes: Vec<usize>,
363 drops: Vec<usize>,
364 rs: SlabMap<Option<Runnable>>,
365 _injector: Option<Box<dyn RuntimeInjector>>,
366}
367
368impl Runner {
369 fn new(waker: Arc<dyn RuntimeWaker>, injector: Option<Box<dyn RuntimeInjector>>) -> Self {
370 Self {
371 rc: RequestChannel::new(waker),
372 wakes: Vec::new(),
373 drops: Vec::new(),
374 rs: SlabMap::new(),
375 _injector: injector,
376 }
377 }
378 fn ready_requests(&mut self) -> bool {
379 self.rc.swap(&mut self.wakes, &mut self.drops);
380 Runtime::with(|rt| {
381 for r in rt.rs.drain(..) {
382 self.wakes.push(
383 self.rs
384 .insert_with_key(|id| Some(Runnable::new(r, id, &self.rc))),
385 );
386 }
387 });
388 !self.wakes.is_empty() || !self.drops.is_empty()
389 }
390 fn apply_drops(&mut self) {
391 for id in self.drops.drain(..) {
392 self.rs.remove(id);
393 }
394 }
395
396 fn step(&mut self) {
397 while self.ready_requests() {
398 for id in self.wakes.drain(..) {
399 run_item(&mut self.rs[id]);
400 }
401 self.apply_drops();
402 }
403 }
404}
405
406struct Runnable {
407 wake: Arc<TaskWake>,
408 r: Pin<Box<dyn DynRunnable>>,
409}
410
411impl Runnable {
412 fn new(r: Pin<Box<dyn DynRunnable>>, id: usize, rc: &RequestChannel) -> Self {
413 r.as_ref().set_id(id);
414 Self {
415 wake: TaskWake::new(id, rc),
416 r,
417 }
418 }
419 fn run(&mut self) -> bool {
420 self.r.as_mut().run(&self.wake.waker())
421 }
422}
423fn run_item(r: &mut Option<Runnable>) {
424 if let Some(runnable) = r {
425 if !runnable.run() {
426 r.take();
427 }
428 }
429}
430
431struct TaskWake {
432 id: usize,
433 is_wake: AtomicBool,
434 rc: RequestChannel,
435}
436
437impl TaskWake {
438 fn new(id: usize, rc: &RequestChannel) -> Arc<Self> {
439 Arc::new(TaskWake {
440 id,
441 is_wake: AtomicBool::new(true),
442 rc: rc.clone(),
443 })
444 }
445 fn waker(self: &Arc<Self>) -> Waker {
446 self.is_wake.store(false, Ordering::SeqCst);
447 self.clone().into()
448 }
449}
450
451impl Wake for TaskWake {
452 fn wake(self: Arc<Self>) {
453 if !self.is_wake.swap(true, Ordering::SeqCst) {
454 self.rc.push_wake(self.id)
455 }
456 }
457}
458impl Drop for TaskWake {
459 fn drop(&mut self) {
460 self.rc.push_drop(self.id);
461 }
462}