1#![doc(hidden)]
3use std::collections::VecDeque;
41use std::sync::Arc;
42use core::cell::RefCell;
43use core::future::Future;
44use core::pin::Pin;
45use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
46
47use tracing::{debug_span, Span};
48use core::sync::atomic::{AtomicU64, Ordering};
49use std::thread::ThreadId;
50use std::sync::Mutex;
51use std::collections::HashMap;
52use lazy_static::lazy_static;
53
54static WAKEUP_HANDLER: Mutex<Option<fn(ThreadId)>> = Mutex::new(None);
55
56#[inline]
61pub fn set_wakeup_handler(handler: fn(ThreadId)) {
62 *WAKEUP_HANDLER.lock().unwrap_or_else(|e| e.into_inner()) = Some(handler);
63}
64
65#[inline]
66fn signal_wakeup(id: ThreadId) {
67 if let Some(handler) = *WAKEUP_HANDLER.lock().unwrap_or_else(|e| e.into_inner()) {
68 handler(id);
69 }
70}
71
72static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
73
74struct Task {
75 owner_thread: ThreadId,
76 future: RefCell<Pin<Box<dyn Future<Output = ()>>>>,
77 span: Span,
78}
79
80unsafe impl Send for Task {}
81unsafe impl Sync for Task {}
82
83thread_local! {
84 static RUN_QUEUE: RefCell<VecDeque<Arc<Task>>> = RefCell::new(VecDeque::new());
85 static STOP: RefCell<bool> = RefCell::new(false);
86 static THREAD_ID: ThreadId = std::thread::current().id();
87}
88
89lazy_static! {
90 static ref INJECTION_QUEUES: Mutex<HashMap<ThreadId, (Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>)>> = Mutex::new(HashMap::new());
91}
92
93fn get_injection_lock() -> std::sync::MutexGuard<'static, HashMap<ThreadId, (Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>)>> {
94 INJECTION_QUEUES.lock().unwrap_or_else(|e| e.into_inner())
95}
96
97thread_local! {
98 static MY_INJECTION: RefCell<Option<(Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>)>> = RefCell::new(None);
99}
100
101fn get_my_injection() -> (Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>) {
102 MY_INJECTION.with(|my| {
103 let mut my = my.borrow_mut();
104 if let Some(cached) = &*my {
105 return cached.clone();
106 }
107 let current = THREAD_ID.with(|id| *id);
108 let mut queues = get_injection_lock();
109 let pair = queues.entry(current).or_insert_with(|| (Arc::new(Mutex::new(VecDeque::new())), Arc::new(core::sync::atomic::AtomicBool::new(false)))).clone();
110 *my = Some(pair.clone());
111 pair
112 })
113}
114
115pub fn stop() {
117 STOP.with(|s| *s.borrow_mut() = true);
118}
119
120pub fn is_stopped() -> bool {
122 STOP.with(|s| *s.borrow())
123}
124
125impl Task {
126 fn poll(self: Arc<Self>) {
127 let _enter = self.span.enter();
128 let waker = unsafe { Waker::from_raw(self.clone().raw_waker()) };
129 let mut cx = Context::from_waker(&waker);
130
131 let mut future = self.future.borrow_mut();
132 match future.as_mut().poll(&mut cx) {
133 Poll::Ready(_) => {}
134 Poll::Pending => {}
135 }
136 }
137
138 fn raw_waker(self: Arc<Self>) -> RawWaker {
139 let ptr = Arc::into_raw(self) as *const ();
140 RawWaker::new(ptr, &VTABLE)
141 }
142}
143
144static VTABLE: RawWakerVTable = RawWakerVTable::new(
145 clone_waker,
146 wake,
147 wake_by_ref,
148 drop_waker,
149);
150
151unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
152 let arc = Arc::from_raw(ptr as *const Task);
153 let cloned = arc.clone();
154 let _ = Arc::into_raw(arc);
155 let new_ptr = Arc::into_raw(cloned) as *const ();
156 RawWaker::new(new_ptr, &VTABLE)
157}
158
159unsafe fn wake(ptr: *const ()) {
160 let arc = Arc::from_raw(ptr as *const Task);
161 let owner = arc.owner_thread;
162 let current = THREAD_ID.with(|id| *id);
163
164 if owner == current {
165 RUN_QUEUE.with(|q| {
166 q.borrow_mut().push_back(arc);
167 });
168 } else {
169 let (queue, flag) = {
170 let mut queues = get_injection_lock();
171 queues.entry(owner).or_insert_with(|| (Arc::new(Mutex::new(VecDeque::new())), Arc::new(core::sync::atomic::AtomicBool::new(false)))).clone()
172 };
173 queue.lock().unwrap_or_else(|e| e.into_inner()).push_back(arc);
174 flag.store(true, Ordering::Release);
175 signal_wakeup(owner);
176 }
177}
178
179unsafe fn wake_by_ref(ptr: *const ()) {
180 let arc = Arc::from_raw(ptr as *const Task);
181 let owner = arc.owner_thread;
182 let current = THREAD_ID.with(|id| *id);
183
184 if owner == current {
185 RUN_QUEUE.with(|q| {
186 q.borrow_mut().push_back(arc.clone());
187 });
188 } else {
189 let (queue, flag) = {
190 let mut queues = get_injection_lock();
191 queues.entry(owner).or_insert_with(|| (Arc::new(Mutex::new(VecDeque::new())), Arc::new(core::sync::atomic::AtomicBool::new(false)))).clone()
192 };
193 queue.lock().unwrap_or_else(|e| e.into_inner()).push_back(arc.clone());
194 flag.store(true, Ordering::Release);
195 signal_wakeup(owner);
196 }
197 let _ = Arc::into_raw(arc);
198}
199
200unsafe fn drop_waker(ptr: *const ()) {
201 drop(Arc::from_raw(ptr as *const Task));
202}
203
204pub fn spawn<F>(future: F)
208where
209 F: Future<Output = ()> + 'static,
210{
211 let _ = get_my_injection();
212
213 let id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
214 let span = debug_span!("task", id = id);
215
216 let task = Arc::new(Task {
217 owner_thread: THREAD_ID.with(|id| *id),
218 future: RefCell::new(Box::pin(future)),
219 span,
220 });
221
222 RUN_QUEUE.with(|q| {
223 q.borrow_mut().push_back(task);
224 });
225}
226
227pub fn has_pending_tasks() -> bool {
229 let local_empty = RUN_QUEUE.with(|q| q.borrow().is_empty());
230 if !local_empty { return true; }
231
232 let (queue, flag) = get_my_injection();
233 if !flag.load(Ordering::Acquire) {
234 return false;
235 }
236
237 let remote = queue.lock().unwrap_or_else(|e| e.into_inner());
238 let res = !remote.is_empty();
239 if !res {
240 flag.store(false, Ordering::Release);
241 }
242 res
243}
244
245pub struct Executor;
247
248impl Executor {
249 pub fn new() -> Self {
251 let _ = get_my_injection();
252 Self
253 }
254
255 pub fn run_until_idle(&self) {
259 self.drain_injection_queue();
260
261 let mut processed = 0;
262 const POLL_BUDGET: usize = 128;
263
264 while processed < POLL_BUDGET {
265 let task = RUN_QUEUE.with(|q| q.borrow_mut().pop_front());
266
267 match task {
268 Some(task) => {
269 task.poll();
270 processed += 1;
271 }
272 None => break,
273 }
274 }
275 }
276
277 fn drain_injection_queue(&self) {
278 let (queue, flag) = get_my_injection();
279 let mut remote = queue.lock().unwrap_or_else(|e| e.into_inner());
280 if !remote.is_empty() {
281 RUN_QUEUE.with(|q| {
282 let mut local = q.borrow_mut();
283 while let Some(task) = remote.pop_front() {
284 local.push_back(task);
285 }
286 });
287 flag.store(false, Ordering::Release);
288 } else {
289 flag.store(false, Ordering::Release);
290 }
291 }
292}
293
294pub fn yield_now() -> YieldNow {
296 YieldNow { yielded: false }
297}
298
299pub struct YieldNow {
301 yielded: bool,
302}
303
304impl Future for YieldNow {
305 type Output = ();
306
307 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
308 if self.yielded {
309 return Poll::Ready(());
310 }
311 self.yielded = true;
312 cx.waker().wake_by_ref();
313 Poll::Pending
314 }
315}