1use std::cell::{Cell, UnsafeCell};
2use std::collections::VecDeque;
3use std::{future::Future, io, sync::Arc, thread};
4
5use async_task::Runnable;
6use crossbeam_queue::SegQueue;
7use swap_buffer_queue::{Queue, buffer::ArrayBuffer, error::TryEnqueueError};
8
9use crate::{driver::Driver, driver::Notify, driver::PollResult, handle::JoinHandle};
10
11scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
12
13pub struct Runtime {
17 stop: Cell<bool>,
18 queue: Arc<RunnableQueue>,
19}
20
21impl Runtime {
22 pub fn new(handle: Box<dyn Notify>) -> Self {
24 Self::builder().build(handle)
25 }
26
27 pub fn builder() -> RuntimeBuilder {
29 RuntimeBuilder::new()
30 }
31
32 #[allow(clippy::arc_with_non_send_sync)]
33 fn with_builder(builder: &RuntimeBuilder, handle: Box<dyn Notify>) -> Self {
34 Self {
35 stop: Cell::new(false),
36 queue: Arc::new(RunnableQueue::new(builder.event_interval, handle)),
37 }
38 }
39
40 pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
46 #[cold]
47 fn not_in_neon_runtime() -> ! {
48 panic!("not in a neon runtime")
49 }
50
51 if CURRENT_RUNTIME.is_set() {
52 CURRENT_RUNTIME.with(f)
53 } else {
54 not_in_neon_runtime()
55 }
56 }
57
58 #[inline]
59 pub fn handle(&self) -> Handle {
61 Handle {
62 queue: self.queue.clone(),
63 }
64 }
65
66 pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
71 unsafe { self.spawn_unchecked(future) }
72 }
73
74 pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> JoinHandle<F::Output> {
80 let queue = self.queue.clone();
81 let schedule = move |runnable| {
82 queue.schedule(runnable);
83 };
84 let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
85 runnable.schedule();
86 JoinHandle::new(task)
87 }
88
89 pub fn poll(&self) -> PollResult {
91 if self.stop.get() {
92 PollResult::Ready
93 } else if self.queue.run() {
94 PollResult::PollAgain
95 } else {
96 PollResult::Pending
97 }
98 }
99
100 pub fn block_on<F: Future>(&self, future: F, driver: &dyn Driver) -> F::Output {
103 self.stop.set(false);
104
105 CURRENT_RUNTIME.set(self, || {
106 let mut result = None;
107 unsafe {
108 self.spawn_unchecked(async {
109 result = Some(future.await);
110 self.stop.set(true);
111 let _ = self.queue.handle.notify();
112 });
113 }
114
115 driver.run(self).expect("Driver failed");
116 result.expect("Driver failed to poll")
117 })
118 }
119}
120
121impl Drop for Runtime {
122 fn drop(&mut self) {
123 CURRENT_RUNTIME.set(self, || {
124 self.queue.clear();
125 })
126 }
127}
128
129#[derive(Debug)]
130pub struct Handle {
132 queue: Arc<RunnableQueue>,
133}
134
135impl Handle {
136 pub fn current() -> Handle {
140 Runtime::with_current(|rt| rt.handle())
141 }
142
143 pub fn notify(&self) -> io::Result<()> {
145 self.queue.handle.notify()
146 }
147
148 pub fn spawn<F: Future + Send + 'static>(&self, future: F) -> JoinHandle<F::Output> {
153 let queue = self.queue.clone();
154 let schedule = move |runnable| {
155 queue.schedule(runnable);
156 };
157 let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
158 runnable.schedule();
159 JoinHandle::new(task)
160 }
161}
162
163impl Clone for Handle {
164 fn clone(&self) -> Self {
165 Self {
166 queue: self.queue.clone(),
167 }
168 }
169}
170
171#[derive(Debug)]
172struct RunnableQueue {
173 id: thread::ThreadId,
174 idle: Cell<bool>,
175 handle: Box<dyn Notify>,
176 event_interval: usize,
177 local_queue: UnsafeCell<VecDeque<Runnable>>,
178 sync_fixed_queue: Queue<ArrayBuffer<Runnable, 128>>,
179 sync_queue: SegQueue<Runnable>,
180}
181
182unsafe impl Send for RunnableQueue {}
183unsafe impl Sync for RunnableQueue {}
184
185impl RunnableQueue {
186 fn new(event_interval: usize, handle: Box<dyn Notify>) -> Self {
187 Self {
188 handle,
189 event_interval,
190 id: thread::current().id(),
191 idle: Cell::new(true),
192 local_queue: UnsafeCell::new(VecDeque::new()),
193 sync_fixed_queue: Queue::default(),
194 sync_queue: SegQueue::new(),
195 }
196 }
197
198 fn schedule(&self, runnable: Runnable) {
199 if self.id == thread::current().id() {
200 unsafe { (*self.local_queue.get()).push_back(runnable) };
201 if self.idle.get() {
202 self.idle.set(false);
203 self.handle.notify().ok();
204 }
205 } else {
206 let result = self.sync_fixed_queue.try_enqueue([runnable]);
207 if let Err(TryEnqueueError::InsufficientCapacity([runnable])) = result {
208 self.sync_queue.push(runnable);
209 }
210 self.handle.notify().ok();
211 }
212 }
213
214 fn run(&self) -> bool {
215 self.idle.set(false);
216
217 for _ in 0..self.event_interval {
218 let task = unsafe { (*self.local_queue.get()).pop_front() };
219 if let Some(task) = task {
220 task.run();
221 } else {
222 break;
223 }
224 }
225
226 if let Ok(buf) = self.sync_fixed_queue.try_dequeue() {
227 for task in buf {
228 task.run();
229 }
230 }
231
232 for _ in 0..self.event_interval {
233 if !self.sync_queue.is_empty() {
234 if let Some(task) = self.sync_queue.pop() {
235 task.run();
236 continue;
237 }
238 }
239 break;
240 }
241 self.idle.set(true);
242
243 !unsafe { (*self.local_queue.get()).is_empty() }
244 || !self.sync_fixed_queue.is_empty()
245 || !self.sync_queue.is_empty()
246 }
247
248 fn clear(&self) {
249 while self.sync_queue.pop().is_some() {}
250 while self.sync_fixed_queue.try_dequeue().is_ok() {}
251 unsafe { (*self.local_queue.get()).clear() };
252 }
253}
254
255#[derive(Debug, Clone)]
257pub struct RuntimeBuilder {
258 event_interval: usize,
259}
260
261impl Default for RuntimeBuilder {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267impl RuntimeBuilder {
268 pub fn new() -> Self {
270 Self { event_interval: 61 }
271 }
272
273 pub fn event_interval(&mut self, val: usize) -> &mut Self {
278 self.event_interval = val;
279 self
280 }
281
282 pub fn build(&self, handle: Box<dyn Notify>) -> Runtime {
284 Runtime::with_builder(self, handle)
285 }
286}