1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
pub mod time; mod utils; use crate::time::{now, TimerEntry}; use futures::future::FutureExt; use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha8Rng; use std::cell::RefCell; use std::collections::binary_heap::BinaryHeap; use std::future::Future; use std::panic::AssertUnwindSafe; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::thread::Result; use std::time::SystemTime; type Task = async_task::Task<()>; type JoinHandle<T> = async_task::JoinHandle<T, ()>; pub struct Environment { rng: ChaCha8Rng, current_time: SystemTime, tasks: Vec<Task>, timers: BinaryHeap<TimerEntry>, } thread_local! { pub(crate) static ENV: RefCell<Option<Environment>> = RefCell::new(None); } impl Environment { pub fn new() -> Self { Environment { rng: ChaCha8Rng::from_entropy(), current_time: SystemTime::now(), tasks: vec![], timers: BinaryHeap::new(), } } pub fn new_with_seed(seed: u64) -> Self { Environment { rng: ChaCha8Rng::seed_from_u64(seed), current_time: SystemTime::now(), tasks: vec![], timers: BinaryHeap::new(), } } pub fn block_on<F, R>(self, future: F) -> Result<R> where F: Future<Output = R> + 'static, R: Send + 'static, { let root_future = AssertUnwindSafe(future).catch_unwind(); pin_utils::pin_mut!(root_future); ENV.with(|e| { assert!( e.borrow().is_none(), "Current thread should not have an environment when calling block_on!" ); e.replace(Some(self)); }); let root_runnable_flag = Arc::new(Mutex::new(true)); let waker = { let flag2 = Arc::clone(&root_runnable_flag); async_task::waker_fn(move || *flag2.lock().expect("root waker") = true) }; let root_cx = &mut Context::from_waker(&waker); let result = loop { let root_runnable = *root_runnable_flag.lock().expect("polling root"); let mut num = ENV.with(|e| e.borrow().as_ref().unwrap().tasks.len()); if root_runnable { num += 1; } if num > 0 { let i = ENV.with(|e| e.borrow_mut().as_mut().unwrap().rng.gen_range(0, num)); if root_runnable && i == 0 { *root_runnable_flag.lock().expect("suspending root") = false; if let Poll::Ready(output) = root_future.as_mut().poll(root_cx) { break output; } } else { let index = if root_runnable { i - 1 } else { i }; let task = ENV.with(|e| e.borrow_mut().as_mut().unwrap().tasks.remove(index)); task.run(); } continue; } if let Some(entry) = ENV.with(|e| e.borrow_mut().as_mut().unwrap().timers.pop()) { if entry.wake_time >= now() { ENV.with(|e| { e.borrow_mut().as_mut().unwrap().current_time = entry.wake_time; }); for waker in entry.wakers { waker.wake(); } continue; } } break Err(Box::new("No task is runnable!")); }; ENV.with(|e| { e.replace(None); }); result } } pub fn spawn<F, R>(future: F) -> JoinHandle<Result<R>> where F: Future<Output = R> + 'static, R: Send + 'static, { let future = AssertUnwindSafe(future).catch_unwind(); let schedule = |t| { ENV.with(|e| { e.borrow_mut().as_mut().unwrap().tasks.push(t); }) }; let (task, handle) = async_task::spawn_local(future, schedule, ()); task.schedule(); handle }