1use std::cell::{Cell, UnsafeCell};
2use std::collections::VecDeque;
3use std::{future::Future, io, sync::Arc, thread};
4
5use async_task::Runnable;
6use crossbeam_queue::SegQueue;
7use swap_buffer_queue::{Queue, buffer::ArrayBuffer, error::TryEnqueueError};
8
9use crate::{driver::Driver, driver::Notify, driver::PollResult, handle::JoinHandle};
10
11scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
12
13#[derive(Debug)]
14pub struct Runtime {
18 stop: Cell<bool>,
19 queue: Arc<RunnableQueue>,
20}
21
22impl Runtime {
23 pub fn new(handle: Box<dyn Notify>) -> Self {
25 Self::builder().build(handle)
26 }
27
28 pub fn builder() -> RuntimeBuilder {
30 RuntimeBuilder::new()
31 }
32
33 #[allow(clippy::arc_with_non_send_sync)]
34 fn with_builder(builder: &RuntimeBuilder, handle: Box<dyn Notify>) -> Self {
35 Self {
36 stop: Cell::new(false),
37 queue: Arc::new(RunnableQueue::new(builder.event_interval, handle)),
38 }
39 }
40
41 pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
47 #[cold]
48 fn not_in_neon_runtime() -> ! {
49 panic!("not in a neon runtime")
50 }
51
52 if CURRENT_RUNTIME.is_set() {
53 CURRENT_RUNTIME.with(f)
54 } else {
55 not_in_neon_runtime()
56 }
57 }
58
59 #[inline]
60 pub fn handle(&self) -> Handle {
62 Handle {
63 queue: self.queue.clone(),
64 }
65 }
66
67 pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
72 unsafe { self.spawn_unchecked(future) }
73 }
74
75 pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> JoinHandle<F::Output> {
81 let queue = self.queue.clone();
82 let schedule = move |runnable| {
83 queue.schedule(runnable);
84 };
85 let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
86 runnable.schedule();
87 JoinHandle::new(task)
88 }
89
90 pub fn poll(&self) -> PollResult {
92 if self.stop.get() {
93 PollResult::Ready
94 } else if self.queue.run() {
95 PollResult::PollAgain
96 } else {
97 PollResult::Pending
98 }
99 }
100
101 pub fn block_on<F: Future>(&self, future: F, driver: &dyn Driver) -> F::Output {
109 self.stop.set(false);
110
111 CURRENT_RUNTIME.set(self, || {
112 let mut result = None;
113 unsafe {
114 self.spawn_unchecked(async {
115 result = Some(future.await);
116 self.stop.set(true);
117 let _ = self.queue.handle.notify();
118 });
119 }
120
121 driver.run(self).expect("Driver failed");
122 result.expect("Driver failed to poll")
123 })
124 }
125}
126
127impl Drop for Runtime {
128 fn drop(&mut self) {
129 CURRENT_RUNTIME.set(self, || {
130 self.queue.clear();
131 });
132 }
133}
134
135#[derive(Debug)]
136pub struct Handle {
138 queue: Arc<RunnableQueue>,
139}
140
141impl Handle {
142 pub fn current() -> Handle {
146 Runtime::with_current(Runtime::handle)
147 }
148
149 pub fn notify(&self) -> io::Result<()> {
151 self.queue.handle.notify()
152 }
153
154 pub fn spawn<F: Future + Send + 'static>(&self, future: F) -> JoinHandle<F::Output> {
159 let queue = self.queue.clone();
160 let schedule = move |runnable| {
161 queue.schedule(runnable);
162 };
163 let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
164 runnable.schedule();
165 JoinHandle::new(task)
166 }
167}
168
169impl Clone for Handle {
170 fn clone(&self) -> Self {
171 Self {
172 queue: self.queue.clone(),
173 }
174 }
175}
176
177#[derive(Debug)]
178struct RunnableQueue {
179 id: thread::ThreadId,
180 idle: Cell<bool>,
181 handle: Box<dyn Notify>,
182 event_interval: usize,
183 local_queue: UnsafeCell<VecDeque<Runnable>>,
184 sync_fixed_queue: Queue<ArrayBuffer<Runnable, 128>>,
185 sync_queue: SegQueue<Runnable>,
186}
187
188unsafe impl Send for RunnableQueue {}
189unsafe impl Sync for RunnableQueue {}
190
191impl RunnableQueue {
192 fn new(event_interval: usize, handle: Box<dyn Notify>) -> Self {
193 Self {
194 handle,
195 event_interval,
196 id: thread::current().id(),
197 idle: Cell::new(true),
198 local_queue: UnsafeCell::new(VecDeque::new()),
199 sync_fixed_queue: Queue::default(),
200 sync_queue: SegQueue::new(),
201 }
202 }
203
204 fn schedule(&self, runnable: Runnable) {
205 if self.id == thread::current().id() {
206 unsafe { (*self.local_queue.get()).push_back(runnable) };
207 if self.idle.get() {
208 self.idle.set(false);
209 self.handle.notify().ok();
210 }
211 } else {
212 let result = self.sync_fixed_queue.try_enqueue([runnable]);
213 if let Err(TryEnqueueError::InsufficientCapacity([runnable])) = result {
214 self.sync_queue.push(runnable);
215 }
216 self.handle.notify().ok();
217 }
218 }
219
220 fn run(&self) -> bool {
221 for _ in 0..self.event_interval {
222 let task = unsafe { (*self.local_queue.get()).pop_front() };
223 if let Some(task) = task {
224 task.run();
225 } else {
226 break;
227 }
228 }
229
230 if let Ok(buf) = self.sync_fixed_queue.try_dequeue() {
231 for task in buf {
232 task.run();
233 }
234 }
235
236 for _ in 0..self.event_interval {
237 if !self.sync_queue.is_empty()
238 && let Some(task) = self.sync_queue.pop()
239 {
240 task.run();
241 continue;
242 }
243 break;
244 }
245
246 let more_tasks = !unsafe { (*self.local_queue.get()).is_empty() }
247 || !self.sync_fixed_queue.is_empty()
248 || !self.sync_queue.is_empty();
249
250 if !more_tasks {
251 self.idle.set(true);
252 }
253 more_tasks
254 }
255
256 fn clear(&self) {
257 while self.sync_queue.pop().is_some() {}
258 while self.sync_fixed_queue.try_dequeue().is_ok() {}
259 unsafe { (*self.local_queue.get()).clear() };
260 }
261}
262
263#[derive(Debug, Clone)]
265pub struct RuntimeBuilder {
266 event_interval: usize,
267}
268
269impl Default for RuntimeBuilder {
270 fn default() -> Self {
271 Self::new()
272 }
273}
274
275impl RuntimeBuilder {
276 pub fn new() -> Self {
278 Self { event_interval: 61 }
279 }
280
281 pub fn event_interval(&mut self, val: usize) -> &mut Self {
286 self.event_interval = val;
287 self
288 }
289
290 pub fn build(&self, handle: Box<dyn Notify>) -> Runtime {
292 Runtime::with_builder(self, handle)
293 }
294}