1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use crate::{ParallelSend, Ref};
use async_task::Runnable;
use flume::{r#async::RecvStream, unbounded, Receiver, Sender};
use futures_lite::Stream;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll, Waker},
};
#[cfg(feature = "parallel")]
use async_task::spawn as spawn_task;
#[cfg(not(feature = "parallel"))]
use async_task::spawn_local as spawn_task;
pin_project! {
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
pub struct Executor {
#[pin]
tasks: RecvStream<'static, Runnable>,
idles: Receiver<Waker>,
idle: Ref<AtomicBool>,
}
}
impl Executor {
pub(crate) fn new() -> (Self, Spawner) {
let (tasks_tx, tasks_rx) = unbounded();
let (idles_tx, idles_rx) = unbounded();
let idle = Ref::new(AtomicBool::new(true));
(
Self {
tasks: tasks_rx.into_stream(),
idles: idles_rx,
idle: idle.clone(),
},
Spawner {
tasks: tasks_tx,
idles: idles_tx,
idle,
},
)
}
}
impl Future for Executor {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let result = {
if let Poll::Ready(task) = self.as_mut().project().tasks.poll_next(cx) {
if let Some(task) = task {
task.run();
cx.waker().wake_by_ref();
return Poll::Pending;
} else {
Poll::Ready(())
}
} else {
Poll::Pending
}
};
self.idle.store(true, Ordering::SeqCst);
while let Ok(waker) = self.idles.try_recv() {
waker.wake();
}
result
}
}
pub struct Spawner {
tasks: Sender<Runnable>,
idles: Sender<Waker>,
idle: Ref<AtomicBool>,
}
impl Spawner {
pub fn spawn<F>(&self, future: F)
where
F: Future + ParallelSend + 'static,
{
let (runnable, task) = spawn_task(
async move {
future.await;
},
self.schedule(),
);
task.detach();
runnable.schedule();
}
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let tasks = self.tasks.clone();
move |runnable: Runnable| {
tasks
.send(runnable)
.expect("Async executor unfortunately destroyed");
}
}
pub fn idle(&self) -> Idle {
if self.idle.load(Ordering::SeqCst) {
Idle::default()
} else {
Idle::new(&self.idles)
}
}
}
#[derive(Default)]
pub struct Idle(Option<Sender<Waker>>);
impl Idle {
fn new(sender: &Sender<Waker>) -> Self {
Self(Some(sender.clone()))
}
}
impl Future for Idle {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Some(sender) = &self.0 {
if sender.send(cx.waker().clone()).is_ok() {
return Poll::Pending;
}
}
Poll::Ready(())
}
}