1use std::future::Future;
4use futures::task;
5use futures::stream::Stream;
6use futures::task::{Poll, Waker, Context, ArcWake};
7use std::pin::Pin;
8use std::mem;
9use std::sync::{Arc, Mutex};
10use crate::{MainLoopError, MainLoop, IODirection, CbHandle, IOAble};
11use std::collections::{HashMap, VecDeque};
12use std::rc::Rc;
13use std::cell::{Cell, RefCell};
14
15use std::time::Instant;
16
17pub struct Delay(Instant);
19
20impl Future for Delay {
21 type Output = Result<(), MainLoopError>;
22 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
23 let n = Instant::now();
24 if self.0 <= n { Poll::Ready(Ok(())) }
26 else {
27 let lw = ctx.waker().clone();
28 match crate::call_after(self.0 - n, move || { lw.wake() }) {
29 Ok(_) => Poll::Pending,
30 Err(e) => Poll::Ready(Err(e)),
31 }
32 }
33 }
34}
35
36pub fn delay(i: Instant) -> Delay {
38 Delay(i)
39}
40
41struct IoInternal {
42 cb_handle: CbHandle,
43 direction: IODirection,
44 queue: RefCell<VecDeque<Result<IODirection, std::io::Error>>>,
45 alive: Cell<bool>,
46 started: Cell<bool>,
47 waker: RefCell<Option<Waker>>,
48}
49
50pub struct Io(Rc<IoInternal>);
53
54impl IOAble for Io {
55 fn handle(&self) -> CbHandle { self.0.cb_handle }
56 fn direction(&self) -> IODirection { self.0.direction }
57 fn on_rw(&mut self, r: Result<IODirection, std::io::Error>) -> bool {
58 self.0.queue.borrow_mut().push_back(r);
59 let w = self.0.waker.borrow();
60 if let Some(waker) = &*w { waker.wake_by_ref() };
61 self.0.alive.get()
62 }
63}
64
65impl Stream for Io {
66 type Item = Result<IODirection, MainLoopError>;
67 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
68 let s: &IoInternal = &(*self).0;
69 if !s.alive.get() { return Poll::Ready(None); }
70
71 if !s.started.get() {
72 let c: &Rc<IoInternal> = &(*self).0;
74 let c = Io(c.clone());
75 if let Err(e) = crate::call_io(c) {
76 s.alive.set(false);
77 return Poll::Ready(Some(Err(e)));
78 }
79 s.started.set(true);
80 }
81
82 let q = s.queue.borrow_mut().pop_front();
83 if let Some(item) = q {
84 let item = item.map_err(|e| MainLoopError::Other(Box::new(e)));
85 Poll::Ready(Some(item))
86 } else {
87 *s.waker.borrow_mut() = Some(ctx.waker().clone());
88 Poll::Pending
89 }
90 }
91}
92
93impl Drop for Io {
94 fn drop(&mut self) {
95 let s: &IoInternal = &(*self).0;
96 s.alive.set(false);
97 }
98}
99
100pub fn io(handle: CbHandle, dir: IODirection) -> Io {
102 Io(Rc::new(IoInternal {
103 cb_handle: handle,
104 direction: dir,
105 alive: Cell::new(true),
106 started: Cell::new(false),
107 queue: Default::default(),
108 waker: Default::default(),
109 }))
110}
111
112type BoxFuture<'a> = Pin<Box<dyn Future<Output=()> + 'a>>;
115
116type RunQueue = Arc<Mutex<Vec<u64>>>;
117
118struct Task(u64, RunQueue);
119
120impl ArcWake for Task {
121 fn wake_by_ref(x: &Arc<Self>) {
122 x.1.lock().unwrap().push(x.0);
123 }
125}
126
127pub struct Executor<'a> {
133 ml: MainLoop<'a>,
134 tasks: HashMap<u64, BoxFuture<'a>>,
135 next_task: u64,
136 run_queue: RunQueue,
137}
138
139impl<'a> Executor<'a> {
140 pub fn new() -> Result<Self, MainLoopError> {
141 Ok(Executor { ml: MainLoop::new()?, next_task: 1, run_queue: Default::default(), tasks: Default::default() })
142 }
143
144 pub fn run(&mut self) {
146 while self.run_one(true) {}
147 }
148
149 pub fn run_one(&mut self, allow_wait: bool) -> bool {
154 let run_queue: Vec<_> = {
155 let mut r = self.run_queue.lock().unwrap();
156 mem::replace(&mut *r, vec!())
157 };
158 if run_queue.len() == 0 {
159 return self.ml.run_one(allow_wait);
160 }
161 for id in run_queue {
162 let remove = {
163 let f = self.tasks.get_mut(&id);
164 if let Some(f) = f {
165 let pinf = f.as_mut();
166 let t = Task(id, self.run_queue.clone());
167 let t = Arc::new(t);
168 let waker = task::waker_ref(&t);
169 let mut ctx = Context::from_waker(&waker);
170 pinf.poll(&mut ctx) != Poll::Pending
171 } else { false }
172 };
173 if remove {
174 self.tasks.remove(&id);
175 }
176 }
177 true
178 }
179
180 pub fn block_on<R: 'a, F: Future<Output=R> + 'a>(&mut self, f: F) -> Option<R> {
184 use futures::future::{FutureExt, ready};
185 let res = Arc::new(RefCell::new(None));
186 let res2 = res.clone();
187 let f = f.then(move |r| { *res2.borrow_mut() = Some(r); ready(()) });
188 self.spawn(f);
189 loop {
190 if !self.run_one(true) { return None };
191 let x = res.borrow_mut().take();
192 if x.is_some() { return x; }
193 }
194 }
195
196 pub fn spawn<F: Future<Output=()> + 'a>(&mut self, f: F) {
197 let x = Box::pin(f);
198 self.tasks.insert(self.next_task, x);
199 self.run_queue.lock().unwrap().push(self.next_task);
200 self.next_task += 1;
201 }
202}
203
204#[test]
205fn delay_test() {
206 use std::time::Duration;
207 use futures::future::{FutureExt, ready};
208
209 let mut x = Executor::new().unwrap();
210 let n = Instant::now() + Duration::from_millis(200);
211 let f = delay(n).then(|_| { println!("Terminating!"); crate::terminate(); ready(()) });
212 x.spawn(f);
213 x.run();
214 assert!(Instant::now() >= n);
215}
216
217#[test]
218fn async_fn_test() {
219 use std::time::Duration;
220
221 async fn foo(n: Instant) {
222 delay(n).await.unwrap();
223 }
224
225 let mut x = Executor::new().unwrap();
226 let n = Instant::now() + Duration::from_millis(200);
227 x.block_on(foo(n));
228 assert!(Instant::now() >= n);
229}
230
231#[test]
232fn async_fn_test_ref() {
233 use std::time::Duration;
234
235 async fn takes_ref(s: &str) {
236 delay(Instant::now() + Duration::from_millis(50)).await.unwrap();
237 println!("{}", s);
238 }
239
240 async fn calls_takes_ref() {
241 let s = String::from("test3");
242 takes_ref(&s).await;
243 }
244
245 fn make_async() -> impl Future {
246 takes_ref("test1")
247 }
248
249let _z = takes_ref;
255
256 let mut x = Executor::new().unwrap();
257x.block_on(make_async());
259 x.block_on(calls_takes_ref());
260
261}