1use std::cell::{Cell, UnsafeCell};
2use std::collections::VecDeque;
3use std::{future::Future, io, sync::Arc, thread};
4
5use async_task::Runnable;
6use crossbeam_queue::SegQueue;
7use swap_buffer_queue::{Queue, buffer::ArrayBuffer, error::TryEnqueueError};
8
9use crate::{driver::Driver, driver::Notify, driver::PollResult, handle::JoinHandle};
10
11scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
12
13#[derive(Debug)]
14pub struct Runtime {
18 stop: Cell<bool>,
19 queue: Arc<RunnableQueue>,
20}
21
22impl Runtime {
23 pub fn new(handle: Box<dyn Notify>) -> Self {
25 Self::builder().build(handle)
26 }
27
28 pub fn builder() -> RuntimeBuilder {
30 RuntimeBuilder::new()
31 }
32
33 #[allow(clippy::arc_with_non_send_sync)]
34 fn with_builder(builder: &RuntimeBuilder, handle: Box<dyn Notify>) -> Self {
35 Self {
36 stop: Cell::new(false),
37 queue: Arc::new(RunnableQueue::new(builder.event_interval, handle)),
38 }
39 }
40
41 pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
47 #[cold]
48 fn not_in_neon_runtime() -> ! {
49 panic!("not in a neon runtime")
50 }
51
52 if CURRENT_RUNTIME.is_set() {
53 CURRENT_RUNTIME.with(f)
54 } else {
55 not_in_neon_runtime()
56 }
57 }
58
59 #[inline]
60 pub fn handle(&self) -> Handle {
62 Handle {
63 queue: self.queue.clone(),
64 }
65 }
66
67 pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
72 unsafe { self.spawn_unchecked(future) }
73 }
74
75 pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> JoinHandle<F::Output> {
81 let queue = self.queue.clone();
82 let schedule = move |runnable| {
83 queue.schedule(runnable);
84 };
85 let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
86 runnable.schedule();
87 JoinHandle::new(task)
88 }
89
90 pub fn poll(&self) -> PollResult {
92 if self.stop.get() {
93 PollResult::Ready
94 } else if self.queue.run() {
95 PollResult::PollAgain
96 } else {
97 PollResult::Pending
98 }
99 }
100
101 pub fn block_on<F: Future>(&self, future: F, driver: &dyn Driver) -> F::Output {
109 self.stop.set(false);
110
111 CURRENT_RUNTIME.set(self, || {
112 let mut result = None;
113 unsafe {
114 self.spawn_unchecked(async {
115 result = Some(future.await);
116 self.stop.set(true);
117 let _ = self.queue.handle.notify();
118 });
119 }
120
121 ntex_error::set_backtrace_start_alt("src/raw.rs", 0);
122 driver.run(self).expect("Driver failed");
123 result.expect("Driver failed to poll")
124 })
125 }
126}
127
128impl Drop for Runtime {
129 fn drop(&mut self) {
130 CURRENT_RUNTIME.set(self, || {
131 self.queue.clear();
132 });
133 }
134}
135
136#[derive(Debug)]
137pub struct Handle {
139 queue: Arc<RunnableQueue>,
140}
141
142impl Handle {
143 pub fn current() -> Handle {
147 Runtime::with_current(Runtime::handle)
148 }
149
150 pub fn notify(&self) -> io::Result<()> {
152 self.queue.handle.notify()
153 }
154
155 pub fn spawn<F: Future + Send + 'static>(&self, future: F) -> JoinHandle<F::Output> {
160 let queue = self.queue.clone();
161 let schedule = move |runnable| {
162 queue.schedule(runnable);
163 };
164 let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
165 runnable.schedule();
166 JoinHandle::new(task)
167 }
168}
169
170impl Clone for Handle {
171 fn clone(&self) -> Self {
172 Self {
173 queue: self.queue.clone(),
174 }
175 }
176}
177
178#[derive(Debug)]
179struct RunnableQueue {
180 id: thread::ThreadId,
181 idle: Cell<bool>,
182 handle: Box<dyn Notify>,
183 event_interval: usize,
184 local_queue: UnsafeCell<VecDeque<Runnable>>,
185 sync_fixed_queue: Queue<ArrayBuffer<Runnable, 128>>,
186 sync_queue: SegQueue<Runnable>,
187}
188
189unsafe impl Send for RunnableQueue {}
190unsafe impl Sync for RunnableQueue {}
191
192impl RunnableQueue {
193 fn new(event_interval: usize, handle: Box<dyn Notify>) -> Self {
194 Self {
195 handle,
196 event_interval,
197 id: thread::current().id(),
198 idle: Cell::new(true),
199 local_queue: UnsafeCell::new(VecDeque::new()),
200 sync_fixed_queue: Queue::default(),
201 sync_queue: SegQueue::new(),
202 }
203 }
204
205 fn schedule(&self, runnable: Runnable) {
206 if self.id == thread::current().id() {
207 unsafe { (*self.local_queue.get()).push_back(runnable) };
208 if self.idle.get() {
209 self.idle.set(false);
210 self.handle.notify().ok();
211 }
212 } else {
213 let result = self.sync_fixed_queue.try_enqueue([runnable]);
214 if let Err(TryEnqueueError::InsufficientCapacity([runnable])) = result {
215 self.sync_queue.push(runnable);
216 }
217 self.handle.notify().ok();
218 }
219 }
220
221 fn run(&self) -> bool {
222 for _ in 0..self.event_interval {
223 let task = unsafe { (*self.local_queue.get()).pop_front() };
224 if let Some(task) = task {
225 task.run();
226 } else {
227 break;
228 }
229 }
230
231 if let Ok(buf) = self.sync_fixed_queue.try_dequeue() {
232 for task in buf {
233 task.run();
234 }
235 }
236
237 for _ in 0..self.event_interval {
238 if !self.sync_queue.is_empty()
239 && let Some(task) = self.sync_queue.pop()
240 {
241 task.run();
242 continue;
243 }
244 break;
245 }
246
247 let more_tasks = !unsafe { (*self.local_queue.get()).is_empty() }
248 || !self.sync_fixed_queue.is_empty()
249 || !self.sync_queue.is_empty();
250
251 if !more_tasks {
252 self.idle.set(true);
253 }
254 more_tasks
255 }
256
257 fn clear(&self) {
258 while self.sync_queue.pop().is_some() {}
259 while self.sync_fixed_queue.try_dequeue().is_ok() {}
260 unsafe { (*self.local_queue.get()).clear() };
261 }
262}
263
264#[derive(Debug, Clone)]
266pub struct RuntimeBuilder {
267 event_interval: usize,
268}
269
270impl Default for RuntimeBuilder {
271 fn default() -> Self {
272 Self::new()
273 }
274}
275
276impl RuntimeBuilder {
277 pub fn new() -> Self {
279 Self { event_interval: 61 }
280 }
281
282 pub fn event_interval(&mut self, val: usize) -> &mut Self {
287 self.event_interval = val;
288 self
289 }
290
291 pub fn build(&self, handle: Box<dyn Notify>) -> Runtime {
293 Runtime::with_builder(self, handle)
294 }
295}