tl_async_runtime/
lib.rs

1#![forbid(unsafe_code)]
2
3use chashmap::CHashMap;
4use driver::executor_context;
5use futures::channel::oneshot;
6use futures::{pin_mut, FutureExt};
7use parking_lot::{Mutex, RwLock};
8use pin_project::pin_project;
9use rand::Rng;
10use std::future::Future;
11use std::ops::ControlFlow;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll, Wake, Waker};
15use std::thread::{self, Thread};
16
17mod driver;
18mod ready;
19
20/// Tools used for communicating with the OS
21pub mod io;
22/// Timers used for pausing tasks for fixed durations
23pub mod timers;
24
25#[derive(Hash, PartialEq, Eq, Clone, Copy, Debug, PartialOrd, Ord)]
26struct TaskId(usize);
27
28type Task = Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
29#[derive(Default)]
30struct Executor {
31    threads: RwLock<Vec<Thread>>,
32    tasks: CHashMap<TaskId, Task>,
33    timers: timers::Queue,
34    ready: ready::Queue,
35    os: Mutex<io::Os>,
36    parked: Arc<()>,
37}
38
39#[pin_project]
40pub struct SpawnHandle<R> {
41    #[pin]
42    receiver: oneshot::Receiver<R>,
43}
44
45impl<R> Future for SpawnHandle<R> {
46    type Output = R;
47
48    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49        let mut this = self.project();
50        // poll the inner channel for the spawned future's result
51        this.receiver.as_mut().poll(cx).map(|x| x.unwrap())
52    }
53}
54
55impl Executor {
56    /// Signals that a task is now ready to be worked on
57    pub(crate) fn signal_ready(&self, id: TaskId) {
58        self.ready.push(id);
59
60        // get a 'random' thread from the loop and wake it up.
61        // does nothing if the thread is not parked.
62        let mut threads = self.threads.write();
63        threads.rotate_left(1);
64        threads[0].unpark();
65    }
66
67    /// Spawns a task in this executor
68    pub(crate) fn spawn<F>(&self, fut: F) -> SpawnHandle<F::Output>
69    where
70        F: Future + Send + Sync + 'static,
71        F::Output: Send + Sync + 'static,
72    {
73        // get a random task id and channel to send the results over
74        let id = TaskId(rand::thread_rng().gen());
75        let (sender, receiver) = oneshot::channel();
76
77        // Pin the future. Also wrap it s.t. it sends it's output over the channel
78        let fut = Box::pin(fut.map(|out| sender.send(out).unwrap_or_default()));
79        // insert the task into the runtime and signal that it is ready for processing
80        self.tasks.insert(id, fut);
81        self.signal_ready(id);
82
83        // return the handle to the reciever so that it can be `await`ed with it's output value
84        SpawnHandle { receiver }
85    }
86
87    // this is run by any thread that currently is not busy.
88    // It manages the timers and OS polling in order to wake up tasks
89    fn book_keeping(&self) {
90        // get the current task timers that have elapsed and insert them into the ready tasks
91        for id in &self.timers {
92            self.signal_ready(id);
93        }
94
95        // get the OS events
96        let mut os = self.os.lock();
97        os.process()
98    }
99
100    /// Run a future to completion.
101    ///
102    /// Starts a new runtime and spawns the future on it.
103    fn block_on<F, R>(self: &Arc<Executor>, fut: F) -> R
104    where
105        F: Future<Output = R> + Send + Sync + 'static,
106        R: Send + Sync + 'static,
107    {
108        // register this thread as a worker
109        self.register();
110
111        // spawn a bunch of worker threads
112        for i in 1..8 {
113            let exec = self.clone();
114            thread::Builder::new()
115                .name(format!("tl-async-runtime-worker-{}", i))
116                .spawn(move || {
117                    // register this new thread as a worker in the runtime
118                    exec.register();
119                    // Run tasks until told to exit
120                    while let ControlFlow::Continue(_) = exec.run_task() {}
121                })
122                .unwrap();
123        }
124
125        // Spawn the task in the newly created runtime
126        let handle = self.spawn(fut);
127        pin_mut!(handle);
128
129        // Waker specifically for the main thread.
130        // Used to wake up the main thread when the output value is ready
131        let waker = Waker::from(Arc::new(ThreadWaker(thread::current())));
132        let mut cx = Context::from_waker(&waker);
133
134        // Run the future to completion.
135        loop {
136            // if the output value is ready, return
137            if let Poll::Ready(res) = handle.as_mut().poll(&mut cx) {
138                break res;
139            }
140
141            // make the main thread busy and also run some tasks
142            self.run_task();
143        }
144    }
145}
146
147struct ThreadWaker(Thread);
148
149impl Wake for ThreadWaker {
150    fn wake(self: Arc<Self>) {
151        self.wake_by_ref();
152    }
153    fn wake_by_ref(self: &Arc<Self>) {
154        self.0.unpark();
155    }
156}
157
158/// Spawn a future on the current runtime.
159/// Returns a new future that can be later awaited for it's output.
160/// Task execution begins eagerly, without needing you to await it
161pub fn spawn<F, R>(fut: F) -> SpawnHandle<R>
162where
163    F: Future<Output = R> + Send + Sync + 'static,
164    R: Send + Sync + 'static,
165{
166    executor_context(|exec| exec.clone().spawn(fut))
167}
168
169/// Run a future to completion.
170///
171/// Starts a new runtime and spawns the future on it.
172pub fn block_on<F, R>(fut: F) -> R
173where
174    F: Future<Output = R> + Send + Sync + 'static,
175    R: Send + Sync + 'static,
176{
177    let executor = Arc::new(Executor::default());
178    executor.block_on(fut)
179}