what_the_async/
lib.rs

1#![forbid(unsafe_code)]
2#![warn(clippy::pedantic)]
3#![allow(clippy::missing_panics_doc)]
4
5use std::{
6    future::Future,
7    sync::{
8        atomic::{AtomicBool, Ordering},
9        Arc,
10    },
11    task::{Context, Wake, Waker},
12};
13
14use futures::pin_mut;
15use wta_executor::Executor;
16pub use wta_executor::{spawn, JoinHandle};
17use wta_reactor::Reactor;
18pub use wta_reactor::{net, timers};
19
20#[derive(Clone)]
21pub struct Runtime {
22    executor: Arc<Executor>,
23    reactor: Arc<Reactor>,
24}
25
26impl Default for Runtime {
27    fn default() -> Self {
28        let executor: Arc<Executor> = Arc::default();
29        let reactor: Arc<Reactor> = Arc::default();
30        let this = Self { executor, reactor };
31
32        let n = std::thread::available_parallelism().map_or(4, std::num::NonZeroUsize::get);
33        for i in 0..n {
34            this.spawn_worker(format!("wta-worker-{}", i));
35        }
36
37        this
38    }
39}
40
41/// Spawns a blocking function in a new thread, and returns an async handle to it's result.
42pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<F::Output>
43where
44    F: FnOnce() -> R + Send + 'static,
45    R: Send + 'static,
46{
47    let (sender, handle) = JoinHandle::new();
48
49    std::thread::Builder::new()
50        .spawn(move || sender.send(f()).unwrap_or_default())
51        .unwrap();
52
53    // return the handle to the reciever so that it can be `await`ed with it's output value
54    handle
55}
56
57impl Runtime {
58    fn register(&self) {
59        self.executor.register();
60        self.reactor.register();
61    }
62
63    pub fn spawn_worker(&self, name: String) {
64        let this = self.clone();
65        std::thread::Builder::new()
66            .name(name)
67            .spawn(move || {
68                this.register();
69                loop {
70                    this.executor.clone().poll_once();
71                }
72            })
73            .unwrap();
74    }
75
76    pub fn block_on<F>(&self, fut: F) -> F::Output
77    where
78        F: Future,
79    {
80        self.register();
81
82        let ready = Arc::new(AtomicBool::new(true));
83        let waker = Waker::from(Arc::new(MainWaker {
84            ready: ready.clone(),
85        }));
86        let mut cx = Context::from_waker(&waker);
87
88        pin_mut!(fut);
89
90        loop {
91            // see if the future is ready, replacing it with false
92            if ready.load(Ordering::Relaxed) {
93                match fut.as_mut().poll(&mut cx) {
94                    std::task::Poll::Ready(r) => break r,
95                    std::task::Poll::Pending => ready.store(false, Ordering::SeqCst),
96                }
97            } else {
98                self.reactor.book_keeping();
99            }
100        }
101    }
102
103    pub fn spawn<F>(&self, fut: F) -> JoinHandle<F::Output>
104    where
105        F: Future + Sync + Send + 'static,
106        F::Output: Send,
107    {
108        self.executor.spawn(fut)
109    }
110}
111
112struct MainWaker {
113    ready: Arc<AtomicBool>,
114}
115
116impl Wake for MainWaker {
117    fn wake(self: Arc<Self>) {
118        self.ready.store(true, Ordering::Relaxed);
119    }
120}