1use std::cell::{Cell, UnsafeCell};
2use std::collections::VecDeque;
3use std::{future::Future, io, sync::Arc, thread};
4
5use async_task::Runnable;
6use crossbeam_queue::SegQueue;
7use swap_buffer_queue::{Queue, buffer::ArrayBuffer, error::TryEnqueueError};
8
9use crate::{driver::Driver, driver::Notify, driver::PollResult, handle::JoinHandle};
10
11scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
12
13pub struct Runtime {
17 stop: Cell<bool>,
18 queue: Arc<RunnableQueue>,
19}
20
21impl Runtime {
22 pub fn new(handle: Box<dyn Notify>) -> Self {
24 Self::builder().build(handle)
25 }
26
27 pub fn builder() -> RuntimeBuilder {
29 RuntimeBuilder::new()
30 }
31
32 #[allow(clippy::arc_with_non_send_sync)]
33 fn with_builder(builder: &RuntimeBuilder, handle: Box<dyn Notify>) -> Self {
34 Self {
35 stop: Cell::new(false),
36 queue: Arc::new(RunnableQueue::new(builder.event_interval, handle)),
37 }
38 }
39
40 pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
46 #[cold]
47 fn not_in_neon_runtime() -> ! {
48 panic!("not in a neon runtime")
49 }
50
51 if CURRENT_RUNTIME.is_set() {
52 CURRENT_RUNTIME.with(f)
53 } else {
54 not_in_neon_runtime()
55 }
56 }
57
58 #[inline]
59 pub fn handle(&self) -> Handle {
61 Handle {
62 queue: self.queue.clone(),
63 }
64 }
65
66 pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
71 unsafe { self.spawn_unchecked(future) }
72 }
73
74 pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> JoinHandle<F::Output> {
80 let queue = self.queue.clone();
81 let schedule = move |runnable| {
82 queue.schedule(runnable);
83 };
84 let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
85 runnable.schedule();
86 JoinHandle::new(task)
87 }
88
89 pub fn poll(&self) -> PollResult {
91 if self.stop.get() {
92 PollResult::Ready
93 } else if self.queue.run() {
94 PollResult::PollAgain
95 } else {
96 PollResult::Pending
97 }
98 }
99
100 pub fn block_on<F: Future>(&self, future: F, driver: &dyn Driver) -> F::Output {
103 self.stop.set(false);
104
105 CURRENT_RUNTIME.set(self, || {
106 let mut result = None;
107 unsafe {
108 self.spawn_unchecked(async {
109 result = Some(future.await);
110 self.stop.set(true);
111 let _ = self.queue.handle.notify();
112 });
113 }
114
115 driver.run(self).expect("Driver failed");
116 result.expect("Driver failed to poll")
117 })
118 }
119}
120
121impl Drop for Runtime {
122 fn drop(&mut self) {
123 CURRENT_RUNTIME.set(self, || {
124 self.queue.clear();
125 })
126 }
127}
128
129#[derive(Debug)]
130pub struct Handle {
132 queue: Arc<RunnableQueue>,
133}
134
135impl Handle {
136 pub fn current() -> Handle {
140 Runtime::with_current(|rt| rt.handle())
141 }
142
143 pub fn notify(&self) -> io::Result<()> {
145 self.queue.handle.notify()
146 }
147
148 pub fn spawn<F: Future + Send + 'static>(&self, future: F) -> JoinHandle<F::Output> {
153 let queue = self.queue.clone();
154 let schedule = move |runnable| {
155 queue.schedule(runnable);
156 };
157 let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
158 runnable.schedule();
159 JoinHandle::new(task)
160 }
161}
162
163impl Clone for Handle {
164 fn clone(&self) -> Self {
165 Self {
166 queue: self.queue.clone(),
167 }
168 }
169}
170
171#[derive(Debug)]
172struct RunnableQueue {
173 id: thread::ThreadId,
174 idle: Cell<bool>,
175 handle: Box<dyn Notify>,
176 event_interval: usize,
177 local_queue: UnsafeCell<VecDeque<Runnable>>,
178 sync_fixed_queue: Queue<ArrayBuffer<Runnable, 128>>,
179 sync_queue: SegQueue<Runnable>,
180}
181
182unsafe impl Send for RunnableQueue {}
183unsafe impl Sync for RunnableQueue {}
184
185impl RunnableQueue {
186 fn new(event_interval: usize, handle: Box<dyn Notify>) -> Self {
187 Self {
188 handle,
189 event_interval,
190 id: thread::current().id(),
191 idle: Cell::new(true),
192 local_queue: UnsafeCell::new(VecDeque::new()),
193 sync_fixed_queue: Queue::default(),
194 sync_queue: SegQueue::new(),
195 }
196 }
197
198 fn schedule(&self, runnable: Runnable) {
199 if self.id == thread::current().id() {
200 unsafe { (*self.local_queue.get()).push_back(runnable) };
201 if self.idle.get() {
202 self.idle.set(false);
203 self.handle.notify().ok();
204 }
205 } else {
206 let result = self.sync_fixed_queue.try_enqueue([runnable]);
207 if let Err(TryEnqueueError::InsufficientCapacity([runnable])) = result {
208 self.sync_queue.push(runnable);
209 }
210 self.handle.notify().ok();
211 }
212 }
213
214 fn run(&self) -> bool {
215 for _ in 0..self.event_interval {
216 let task = unsafe { (*self.local_queue.get()).pop_front() };
217 if let Some(task) = task {
218 task.run();
219 } else {
220 break;
221 }
222 }
223
224 if let Ok(buf) = self.sync_fixed_queue.try_dequeue() {
225 for task in buf {
226 task.run();
227 }
228 }
229
230 for _ in 0..self.event_interval {
231 if !self.sync_queue.is_empty() {
232 if let Some(task) = self.sync_queue.pop() {
233 task.run();
234 continue;
235 }
236 }
237 break;
238 }
239
240 let more_tasks = !unsafe { (*self.local_queue.get()).is_empty() }
241 || !self.sync_fixed_queue.is_empty()
242 || !self.sync_queue.is_empty();
243
244 if !more_tasks {
245 self.idle.set(true);
246 }
247 more_tasks
248 }
249
250 fn clear(&self) {
251 while self.sync_queue.pop().is_some() {}
252 while self.sync_fixed_queue.try_dequeue().is_ok() {}
253 unsafe { (*self.local_queue.get()).clear() };
254 }
255}
256
257#[derive(Debug, Clone)]
259pub struct RuntimeBuilder {
260 event_interval: usize,
261}
262
263impl Default for RuntimeBuilder {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269impl RuntimeBuilder {
270 pub fn new() -> Self {
272 Self { event_interval: 61 }
273 }
274
275 pub fn event_interval(&mut self, val: usize) -> &mut Self {
280 self.event_interval = val;
281 self
282 }
283
284 pub fn build(&self, handle: Box<dyn Notify>) -> Runtime {
286 Runtime::with_builder(self, handle)
287 }
288}