thin_main_loop/
future.rs

1//! 0.3 Futures support
2
3use std::future::Future;
4use futures::task;
5use futures::stream::Stream;
6use futures::task::{Poll, Waker, Context, ArcWake};
7use std::pin::Pin;
8use std::mem;
9use std::sync::{Arc, Mutex};
10use crate::{MainLoopError, MainLoop, IODirection, CbHandle, IOAble};
11use std::collections::{HashMap, VecDeque};
12use std::rc::Rc;
13use std::cell::{Cell, RefCell};
14
15use std::time::Instant;
16
17/// Waits until a specific instant.
18pub struct Delay(Instant);
19
20impl Future for Delay {
21    type Output = Result<(), MainLoopError>;
22    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
23        let n = Instant::now();
24        // println!("Polled at {:?}", n);
25        if self.0 <= n { Poll::Ready(Ok(())) }
26        else {
27            let lw = ctx.waker().clone();
28            match crate::call_after(self.0 - n, move || { lw.wake() }) {
29                Ok(_) => Poll::Pending,
30                Err(e) => Poll::Ready(Err(e)),
31            }
32        }
33    }
34}
35
36/// Waits until a specific instant.
37pub fn delay(i: Instant) -> Delay {
38    Delay(i)
39}
40
41struct IoInternal {
42    cb_handle: CbHandle,
43    direction: IODirection,
44    queue: RefCell<VecDeque<Result<IODirection, std::io::Error>>>,
45    alive: Cell<bool>,
46    started: Cell<bool>,
47    waker: RefCell<Option<Waker>>,
48}
49
50/// Io implements "futures::Stream", so it will output an item whenever 
51/// the handle is ready for read / write.
52pub struct Io(Rc<IoInternal>);
53
54impl IOAble for Io {
55    fn handle(&self) -> CbHandle { self.0.cb_handle }
56    fn direction(&self) -> IODirection { self.0.direction }
57    fn on_rw(&mut self, r: Result<IODirection, std::io::Error>) -> bool {
58        self.0.queue.borrow_mut().push_back(r);
59        let w = self.0.waker.borrow();
60        if let Some(waker) = &*w { waker.wake_by_ref() };
61        self.0.alive.get()
62    }
63}
64
65impl Stream for Io {
66    type Item = Result<IODirection, MainLoopError>;
67    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
68        let s: &IoInternal = &(*self).0;
69        if !s.alive.get() { return Poll::Ready(None); }
70
71        if !s.started.get() {
72            // Submit to the reactor
73            let c: &Rc<IoInternal> = &(*self).0;
74            let c = Io(c.clone());
75            if let Err(e) = crate::call_io(c) {
76                s.alive.set(false);
77                return Poll::Ready(Some(Err(e)));
78            }
79            s.started.set(true);
80        }
81
82        let q = s.queue.borrow_mut().pop_front();
83        if let Some(item) = q {
84            let item = item.map_err(|e| MainLoopError::Other(Box::new(e)));
85            Poll::Ready(Some(item))
86        } else {
87            *s.waker.borrow_mut() = Some(ctx.waker().clone());
88            Poll::Pending
89        }
90    }
91}
92
93impl Drop for Io {
94    fn drop(&mut self) {
95        let s: &IoInternal = &(*self).0;
96        s.alive.set(false);
97    }
98}
99
100/// Creates a new Io, which outputs an item whenever the handle is ready for reading / writing. 
101pub fn io(handle: CbHandle, dir: IODirection) -> Io {
102    Io(Rc::new(IoInternal {
103        cb_handle: handle,
104        direction: dir,
105        alive: Cell::new(true),
106        started: Cell::new(false),
107        queue: Default::default(),
108        waker: Default::default(),
109    }))
110}
111
112// And the executor stuff 
113
114type BoxFuture<'a> = Pin<Box<dyn Future<Output=()> + 'a>>;
115
116type RunQueue = Arc<Mutex<Vec<u64>>>;
117
118struct Task(u64, RunQueue);
119
120impl ArcWake for Task {
121    fn wake_by_ref(x: &Arc<Self>) {
122        x.1.lock().unwrap().push(x.0);
123        // println!("Waking up");
124    }
125}
126
127/// A futures executor that supports spawning futures. 
128///
129/// If you use "Delay" or "Io", this is the executor you need to
130/// spawn it on.
131/// It contains a MainLoop inside, so you can spawn 'static callbacks too. 
132pub struct Executor<'a> {
133    ml: MainLoop<'a>,
134    tasks: HashMap<u64, BoxFuture<'a>>,
135    next_task: u64,
136    run_queue: RunQueue,
137}
138
139impl<'a> Executor<'a> {
140    pub fn new() -> Result<Self, MainLoopError> {
141        Ok(Executor { ml: MainLoop::new()?, next_task: 1, run_queue: Default::default(), tasks: Default::default() })
142    }
143
144    /// Runs until the main loop is terminated.
145    pub fn run(&mut self) {
146        while self.run_one(true) {}
147    }
148
149    /// Processes futures ready to make progress.
150    ///
151    /// If no futures are ready to progress, may block in case allow_wait is true.
152    /// Returns false if the mainloop was terminated.
153    pub fn run_one(&mut self, allow_wait: bool) -> bool {
154        let run_queue: Vec<_> = {
155            let mut r = self.run_queue.lock().unwrap();
156            mem::replace(&mut *r, vec!())
157        };
158        if run_queue.len() == 0 {
159            return self.ml.run_one(allow_wait);
160        }
161        for id in run_queue {
162            let remove = {
163                let f = self.tasks.get_mut(&id);
164                if let Some(f) = f {
165                    let pinf = f.as_mut();
166                    let t = Task(id, self.run_queue.clone());
167                    let t = Arc::new(t);
168                    let waker = task::waker_ref(&t);
169                    let mut ctx = Context::from_waker(&waker);
170                    pinf.poll(&mut ctx) != Poll::Pending
171                } else { false }
172            };
173            if remove {
174                self.tasks.remove(&id);
175            }
176        }
177        true
178    }
179
180    /// Runs until the future is ready, or the main loop is terminated.
181    ///
182    /// Returns None if the main loop is terminated, or the result of the future otherwise.
183    pub fn block_on<R: 'a, F: Future<Output=R> + 'a>(&mut self, f: F) -> Option<R> {
184        use futures::future::{FutureExt, ready};
185        let res = Arc::new(RefCell::new(None));
186        let res2 = res.clone();
187        let f = f.then(move |r| { *res2.borrow_mut() = Some(r); ready(()) });
188        self.spawn(f);
189        loop {
190            if !self.run_one(true) { return None };
191            let x = res.borrow_mut().take();
192            if x.is_some() { return x; }
193        }
194    }
195
196    pub fn spawn<F: Future<Output=()> + 'a>(&mut self, f: F) {
197        let x = Box::pin(f);
198        self.tasks.insert(self.next_task, x);
199        self.run_queue.lock().unwrap().push(self.next_task);
200        self.next_task += 1;
201    }
202}
203
204#[test]
205fn delay_test() {
206    use std::time::Duration;
207    use futures::future::{FutureExt, ready};
208
209    let mut x = Executor::new().unwrap();
210    let n = Instant::now() + Duration::from_millis(200);
211    let f = delay(n).then(|_| { println!("Terminating!"); crate::terminate(); ready(()) });
212    x.spawn(f);
213    x.run();
214    assert!(Instant::now() >= n);
215}
216
217#[test]
218fn async_fn_test() {
219    use std::time::Duration;
220
221    async fn foo(n: Instant) {
222        delay(n).await.unwrap();
223    }
224
225    let mut x = Executor::new().unwrap();
226    let n = Instant::now() + Duration::from_millis(200);
227    x.block_on(foo(n));
228    assert!(Instant::now() >= n);
229}
230
231#[test]
232fn async_fn_test_ref() {
233    use std::time::Duration;
234
235    async fn takes_ref(s: &str) {
236        delay(Instant::now() + Duration::from_millis(50)).await.unwrap();
237        println!("{}", s);
238    }
239
240    async fn calls_takes_ref() {
241        let s = String::from("test3");
242        takes_ref(&s).await;
243    }
244
245    fn make_async() -> impl Future {
246        takes_ref("test1")
247    }
248
249/*    fn call_async<'a, F: FnOnce(&'a str) -> G, G: Future + 'a>(f: F) -> G {
250        let s = String::from("test2");
251        f(&s)
252    }
253*/
254    let _z = takes_ref;
255
256    let mut x = Executor::new().unwrap();
257//    x.block_on(call_async(takes_ref));
258    x.block_on(make_async());   
259    x.block_on(calls_takes_ref());
260
261}