1#![forbid(unsafe_code)]
2
3use chashmap::CHashMap;
4use driver::executor_context;
5use futures::channel::oneshot;
6use futures::{pin_mut, FutureExt};
7use parking_lot::{Mutex, RwLock};
8use pin_project::pin_project;
9use rand::Rng;
10use std::future::Future;
11use std::ops::ControlFlow;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll, Wake, Waker};
15use std::thread::{self, Thread};
16
17mod driver;
18mod ready;
19
20pub mod io;
22pub mod timers;
24
25#[derive(Hash, PartialEq, Eq, Clone, Copy, Debug, PartialOrd, Ord)]
26struct TaskId(usize);
27
28type Task = Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
29#[derive(Default)]
30struct Executor {
31 threads: RwLock<Vec<Thread>>,
32 tasks: CHashMap<TaskId, Task>,
33 timers: timers::Queue,
34 ready: ready::Queue,
35 os: Mutex<io::Os>,
36 parked: Arc<()>,
37}
38
39#[pin_project]
40pub struct SpawnHandle<R> {
41 #[pin]
42 receiver: oneshot::Receiver<R>,
43}
44
45impl<R> Future for SpawnHandle<R> {
46 type Output = R;
47
48 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49 let mut this = self.project();
50 this.receiver.as_mut().poll(cx).map(|x| x.unwrap())
52 }
53}
54
55impl Executor {
56 pub(crate) fn signal_ready(&self, id: TaskId) {
58 self.ready.push(id);
59
60 let mut threads = self.threads.write();
63 threads.rotate_left(1);
64 threads[0].unpark();
65 }
66
67 pub(crate) fn spawn<F>(&self, fut: F) -> SpawnHandle<F::Output>
69 where
70 F: Future + Send + Sync + 'static,
71 F::Output: Send + Sync + 'static,
72 {
73 let id = TaskId(rand::thread_rng().gen());
75 let (sender, receiver) = oneshot::channel();
76
77 let fut = Box::pin(fut.map(|out| sender.send(out).unwrap_or_default()));
79 self.tasks.insert(id, fut);
81 self.signal_ready(id);
82
83 SpawnHandle { receiver }
85 }
86
87 fn book_keeping(&self) {
90 for id in &self.timers {
92 self.signal_ready(id);
93 }
94
95 let mut os = self.os.lock();
97 os.process()
98 }
99
100 fn block_on<F, R>(self: &Arc<Executor>, fut: F) -> R
104 where
105 F: Future<Output = R> + Send + Sync + 'static,
106 R: Send + Sync + 'static,
107 {
108 self.register();
110
111 for i in 1..8 {
113 let exec = self.clone();
114 thread::Builder::new()
115 .name(format!("tl-async-runtime-worker-{}", i))
116 .spawn(move || {
117 exec.register();
119 while let ControlFlow::Continue(_) = exec.run_task() {}
121 })
122 .unwrap();
123 }
124
125 let handle = self.spawn(fut);
127 pin_mut!(handle);
128
129 let waker = Waker::from(Arc::new(ThreadWaker(thread::current())));
132 let mut cx = Context::from_waker(&waker);
133
134 loop {
136 if let Poll::Ready(res) = handle.as_mut().poll(&mut cx) {
138 break res;
139 }
140
141 self.run_task();
143 }
144 }
145}
146
147struct ThreadWaker(Thread);
148
149impl Wake for ThreadWaker {
150 fn wake(self: Arc<Self>) {
151 self.wake_by_ref();
152 }
153 fn wake_by_ref(self: &Arc<Self>) {
154 self.0.unpark();
155 }
156}
157
158pub fn spawn<F, R>(fut: F) -> SpawnHandle<R>
162where
163 F: Future<Output = R> + Send + Sync + 'static,
164 R: Send + Sync + 'static,
165{
166 executor_context(|exec| exec.clone().spawn(fut))
167}
168
169pub fn block_on<F, R>(fut: F) -> R
173where
174 F: Future<Output = R> + Send + Sync + 'static,
175 R: Send + Sync + 'static,
176{
177 let executor = Arc::new(Executor::default());
178 executor.block_on(fut)
179}