diviner/
lib.rs

1pub mod time;
2mod utils;
3
4use crate::time::{now, TimerEntry};
5use futures::future::FutureExt;
6use rand::{Rng, SeedableRng};
7use rand_chacha::ChaCha8Rng;
8use std::cell::RefCell;
9use std::collections::binary_heap::BinaryHeap;
10use std::future::Future;
11use std::panic::AssertUnwindSafe;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll};
14use std::thread::Result;
15use std::time::SystemTime;
16
17type Task = async_task::Task<()>;
18type JoinHandle<T> = async_task::JoinHandle<T, ()>;
19
20pub struct Environment {
21    rng: ChaCha8Rng,
22    current_time: SystemTime,
23    tasks: Vec<Task>,
24    timers: BinaryHeap<TimerEntry>,
25}
26
27thread_local! {
28    pub(crate) static ENV: RefCell<Option<Environment>> = RefCell::new(None);
29}
30
31impl Environment {
32    pub fn new() -> Self {
33        Environment {
34            rng: ChaCha8Rng::from_entropy(),
35            current_time: SystemTime::now(),
36            tasks: vec![],
37            timers: BinaryHeap::new(),
38        }
39    }
40
41    pub fn new_with_seed(seed: u64) -> Self {
42        Environment {
43            rng: ChaCha8Rng::seed_from_u64(seed),
44            current_time: SystemTime::now(),
45            tasks: vec![],
46            timers: BinaryHeap::new(),
47        }
48    }
49
50    pub fn block_on<F, R>(self, future: F) -> Result<R>
51    where
52        F: Future<Output = R> + 'static,
53        R: Send + 'static,
54    {
55        let root_future = AssertUnwindSafe(future).catch_unwind();
56        pin_utils::pin_mut!(root_future);
57        ENV.with(|e| {
58            assert!(
59                e.borrow().is_none(),
60                "Current thread should not have an environment when calling block_on!"
61            );
62            e.replace(Some(self));
63        });
64        let root_runnable_flag = Arc::new(Mutex::new(true));
65        let waker = {
66            let flag2 = Arc::clone(&root_runnable_flag);
67            async_task::waker_fn(move || *flag2.lock().expect("root waker") = true)
68        };
69        let root_cx = &mut Context::from_waker(&waker);
70        let result = loop {
71            let root_runnable = *root_runnable_flag.lock().expect("polling root");
72            let mut num = ENV.with(|e| e.borrow().as_ref().unwrap().tasks.len());
73            if root_runnable {
74                num += 1;
75            }
76            if num > 0 {
77                let i = ENV.with(|e| e.borrow_mut().as_mut().unwrap().rng.gen_range(0, num));
78                if root_runnable && i == 0 {
79                    *root_runnable_flag.lock().expect("suspending root") = false;
80                    if let Poll::Ready(output) = root_future.as_mut().poll(root_cx) {
81                        break output;
82                    }
83                } else {
84                    let index = if root_runnable { i - 1 } else { i };
85                    let task = ENV.with(|e| e.borrow_mut().as_mut().unwrap().tasks.remove(index));
86                    task.run();
87                }
88                continue;
89            }
90            if let Some(entry) = ENV.with(|e| e.borrow_mut().as_mut().unwrap().timers.pop()) {
91                if entry.wake_time >= now() {
92                    ENV.with(|e| {
93                        e.borrow_mut().as_mut().unwrap().current_time = entry.wake_time;
94                    });
95                    for waker in entry.wakers {
96                        waker.wake();
97                    }
98                    continue;
99                }
100            }
101            break Err(Box::new("No task is runnable!"));
102        };
103        ENV.with(|e| {
104            e.replace(None);
105        });
106        result
107    }
108}
109
110pub fn spawn<F, R>(future: F) -> JoinHandle<Result<R>>
111where
112    F: Future<Output = R> + 'static,
113    R: Send + 'static,
114{
115    let future = AssertUnwindSafe(future).catch_unwind();
116    let schedule = |t| {
117        ENV.with(|e| {
118            e.borrow_mut().as_mut().unwrap().tasks.push(t);
119        })
120    };
121    let (task, handle) = async_task::spawn_local(future, schedule, ());
122    task.schedule();
123
124    handle
125}