1#![forbid(unsafe_code)]
2#![warn(clippy::pedantic)]
3#![allow(clippy::missing_panics_doc)]
4
5use std::{
6 future::Future,
7 sync::{
8 atomic::{AtomicBool, Ordering},
9 Arc,
10 },
11 task::{Context, Wake, Waker},
12};
13
14use futures::pin_mut;
15use wta_executor::Executor;
16pub use wta_executor::{spawn, JoinHandle};
17use wta_reactor::Reactor;
18pub use wta_reactor::{net, timers};
19
20#[derive(Clone)]
21pub struct Runtime {
22 executor: Arc<Executor>,
23 reactor: Arc<Reactor>,
24}
25
26impl Default for Runtime {
27 fn default() -> Self {
28 let executor: Arc<Executor> = Arc::default();
29 let reactor: Arc<Reactor> = Arc::default();
30 let this = Self { executor, reactor };
31
32 let n = std::thread::available_parallelism().map_or(4, std::num::NonZeroUsize::get);
33 for i in 0..n {
34 this.spawn_worker(format!("wta-worker-{}", i));
35 }
36
37 this
38 }
39}
40
41pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<F::Output>
43where
44 F: FnOnce() -> R + Send + 'static,
45 R: Send + 'static,
46{
47 let (sender, handle) = JoinHandle::new();
48
49 std::thread::Builder::new()
50 .spawn(move || sender.send(f()).unwrap_or_default())
51 .unwrap();
52
53 handle
55}
56
57impl Runtime {
58 fn register(&self) {
59 self.executor.register();
60 self.reactor.register();
61 }
62
63 pub fn spawn_worker(&self, name: String) {
64 let this = self.clone();
65 std::thread::Builder::new()
66 .name(name)
67 .spawn(move || {
68 this.register();
69 loop {
70 this.executor.clone().poll_once();
71 }
72 })
73 .unwrap();
74 }
75
76 pub fn block_on<F>(&self, fut: F) -> F::Output
77 where
78 F: Future,
79 {
80 self.register();
81
82 let ready = Arc::new(AtomicBool::new(true));
83 let waker = Waker::from(Arc::new(MainWaker {
84 ready: ready.clone(),
85 }));
86 let mut cx = Context::from_waker(&waker);
87
88 pin_mut!(fut);
89
90 loop {
91 if ready.load(Ordering::Relaxed) {
93 match fut.as_mut().poll(&mut cx) {
94 std::task::Poll::Ready(r) => break r,
95 std::task::Poll::Pending => ready.store(false, Ordering::SeqCst),
96 }
97 } else {
98 self.reactor.book_keeping();
99 }
100 }
101 }
102
103 pub fn spawn<F>(&self, fut: F) -> JoinHandle<F::Output>
104 where
105 F: Future + Sync + Send + 'static,
106 F::Output: Send,
107 {
108 self.executor.spawn(fut)
109 }
110}
111
112struct MainWaker {
113 ready: Arc<AtomicBool>,
114}
115
116impl Wake for MainWaker {
117 fn wake(self: Arc<Self>) {
118 self.ready.store(true, Ordering::Relaxed);
119 }
120}