1pub mod time;
2mod utils;
3
4use crate::time::{now, TimerEntry};
5use futures::future::FutureExt;
6use rand::{Rng, SeedableRng};
7use rand_chacha::ChaCha8Rng;
8use std::cell::RefCell;
9use std::collections::binary_heap::BinaryHeap;
10use std::future::Future;
11use std::panic::AssertUnwindSafe;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll};
14use std::thread::Result;
15use std::time::SystemTime;
16
17type Task = async_task::Task<()>;
18type JoinHandle<T> = async_task::JoinHandle<T, ()>;
19
20pub struct Environment {
21 rng: ChaCha8Rng,
22 current_time: SystemTime,
23 tasks: Vec<Task>,
24 timers: BinaryHeap<TimerEntry>,
25}
26
27thread_local! {
28 pub(crate) static ENV: RefCell<Option<Environment>> = RefCell::new(None);
29}
30
31impl Environment {
32 pub fn new() -> Self {
33 Environment {
34 rng: ChaCha8Rng::from_entropy(),
35 current_time: SystemTime::now(),
36 tasks: vec![],
37 timers: BinaryHeap::new(),
38 }
39 }
40
41 pub fn new_with_seed(seed: u64) -> Self {
42 Environment {
43 rng: ChaCha8Rng::seed_from_u64(seed),
44 current_time: SystemTime::now(),
45 tasks: vec![],
46 timers: BinaryHeap::new(),
47 }
48 }
49
50 pub fn block_on<F, R>(self, future: F) -> Result<R>
51 where
52 F: Future<Output = R> + 'static,
53 R: Send + 'static,
54 {
55 let root_future = AssertUnwindSafe(future).catch_unwind();
56 pin_utils::pin_mut!(root_future);
57 ENV.with(|e| {
58 assert!(
59 e.borrow().is_none(),
60 "Current thread should not have an environment when calling block_on!"
61 );
62 e.replace(Some(self));
63 });
64 let root_runnable_flag = Arc::new(Mutex::new(true));
65 let waker = {
66 let flag2 = Arc::clone(&root_runnable_flag);
67 async_task::waker_fn(move || *flag2.lock().expect("root waker") = true)
68 };
69 let root_cx = &mut Context::from_waker(&waker);
70 let result = loop {
71 let root_runnable = *root_runnable_flag.lock().expect("polling root");
72 let mut num = ENV.with(|e| e.borrow().as_ref().unwrap().tasks.len());
73 if root_runnable {
74 num += 1;
75 }
76 if num > 0 {
77 let i = ENV.with(|e| e.borrow_mut().as_mut().unwrap().rng.gen_range(0, num));
78 if root_runnable && i == 0 {
79 *root_runnable_flag.lock().expect("suspending root") = false;
80 if let Poll::Ready(output) = root_future.as_mut().poll(root_cx) {
81 break output;
82 }
83 } else {
84 let index = if root_runnable { i - 1 } else { i };
85 let task = ENV.with(|e| e.borrow_mut().as_mut().unwrap().tasks.remove(index));
86 task.run();
87 }
88 continue;
89 }
90 if let Some(entry) = ENV.with(|e| e.borrow_mut().as_mut().unwrap().timers.pop()) {
91 if entry.wake_time >= now() {
92 ENV.with(|e| {
93 e.borrow_mut().as_mut().unwrap().current_time = entry.wake_time;
94 });
95 for waker in entry.wakers {
96 waker.wake();
97 }
98 continue;
99 }
100 }
101 break Err(Box::new("No task is runnable!"));
102 };
103 ENV.with(|e| {
104 e.replace(None);
105 });
106 result
107 }
108}
109
110pub fn spawn<F, R>(future: F) -> JoinHandle<Result<R>>
111where
112 F: Future<Output = R> + 'static,
113 R: Send + 'static,
114{
115 let future = AssertUnwindSafe(future).catch_unwind();
116 let schedule = |t| {
117 ENV.with(|e| {
118 e.borrow_mut().as_mut().unwrap().tasks.push(t);
119 })
120 };
121 let (task, handle) = async_task::spawn_local(future, schedule, ());
122 task.schedule();
123
124 handle
125}