1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#![no_std]
extern crate alloc;
pub use executor_macros::*;
use lazy_static::*;
use {
alloc::{boxed::Box, collections::vec_deque::VecDeque, sync::Arc},
core::{
future::Future,
pin::Pin,
task::{Context, Poll},
},
spin::Mutex,
woke::{waker_ref, Woke},
};
pub struct Executor {
tasks: VecDeque<Box<dyn Pendable + core::marker::Send + core::marker::Sync>>,
}
trait Pendable {
fn is_pending(&self) -> bool;
}
impl Default for Executor {
fn default() -> Self {
Executor {
tasks: VecDeque::new(),
}
}
}
struct Task<T> {
pub future: Mutex<Pin<Box<dyn Future<Output = T> + Send + 'static>>>,
}
impl<T> Woke for Task<T> {
fn wake_by_ref(_: &Arc<Self>) {
DEFAULT_EXECUTOR.lock().poll_tasks()
}
}
impl<T> Pendable for Arc<Task<T>> {
fn is_pending(&self) -> bool {
let mut future = self.future.lock();
let waker = waker_ref(&self);
let context = &mut Context::from_waker(&*waker);
let check_pending = matches!(future.as_mut().poll(context), Poll::Pending);
check_pending
}
}
impl Executor {
fn block_on<T>(&mut self, future: Box<dyn Future<Output = T> + 'static + Send + Unpin>) -> T
where
T: Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
});
loop {
let mut future = task.future.lock();
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
let result = future.as_mut().poll(context);
if let Poll::Ready(val) = result {
return val;
}
}
}
pub fn run<T>(&mut self, future: Box<dyn Future<Output = T> + 'static + Send + Unpin>)
where
T: Send + 'static,
{
self.add_task(future);
self.poll_tasks();
}
fn add_task<T>(
&mut self,
future: Box<dyn Future<Output = T> + 'static + Send + Unpin>,
) -> Arc<Task<T>>
where
T: Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
});
self.tasks.push_back(Box::new(task.clone()));
task
}
fn poll_tasks(&mut self) {
let count = self.tasks.len();
for _ in 0..count {
let task = self.tasks.remove(0).unwrap();
let mut is_pending = false;
{
if task.is_pending() {
is_pending = true;
}
}
if is_pending {
self.tasks.push_back(task);
}
}
}
}
lazy_static! {
static ref DEFAULT_EXECUTOR: Mutex<Box<Executor>> = {
let m = Executor::default();
Mutex::new(Box::new(m))
};
}
pub fn block_on<T>(future: impl Future<Output = T> + 'static + Send) -> T
where
T: Send + 'static,
{
DEFAULT_EXECUTOR.lock().block_on(Box::new(Box::pin(future)))
}
pub fn run<T>(future: impl Future<Output = T> + 'static + Send)
where
T: Send + 'static,
{
DEFAULT_EXECUTOR.lock().run(Box::new(Box::pin(future)))
}